| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663 |
- // Package main provides a demonstration server showing SeaweedFS RDMA integration
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "net/http"
- "os"
- "os/signal"
- "strconv"
- "strings"
- "syscall"
- "time"
- "seaweedfs-rdma-sidecar/pkg/seaweedfs"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/sirupsen/logrus"
- "github.com/spf13/cobra"
- )
- var (
- port int
- rdmaSocket string
- volumeServerURL string
- enableRDMA bool
- enableZeroCopy bool
- tempDir string
- enablePooling bool
- maxConnections int
- maxIdleTime time.Duration
- debug bool
- )
- func main() {
- var rootCmd = &cobra.Command{
- Use: "demo-server",
- Short: "SeaweedFS RDMA integration demonstration server",
- Long: `Demonstration server that shows how SeaweedFS can integrate with the RDMA sidecar
- for accelerated read operations. This server provides HTTP endpoints that demonstrate
- the RDMA fast path with HTTP fallback capabilities.`,
- RunE: runServer,
- }
- rootCmd.Flags().IntVarP(&port, "port", "p", 8080, "Demo server HTTP port")
- rootCmd.Flags().StringVarP(&rdmaSocket, "rdma-socket", "r", "/tmp/rdma-engine.sock", "Path to RDMA engine Unix socket")
- rootCmd.Flags().StringVarP(&volumeServerURL, "volume-server", "v", "http://localhost:8080", "SeaweedFS volume server URL for HTTP fallback")
- rootCmd.Flags().BoolVarP(&enableRDMA, "enable-rdma", "e", true, "Enable RDMA acceleration")
- rootCmd.Flags().BoolVarP(&enableZeroCopy, "enable-zerocopy", "z", true, "Enable zero-copy optimization via temp files")
- rootCmd.Flags().StringVarP(&tempDir, "temp-dir", "t", "/tmp/rdma-cache", "Temp directory for zero-copy files")
- rootCmd.Flags().BoolVar(&enablePooling, "enable-pooling", true, "Enable RDMA connection pooling")
- rootCmd.Flags().IntVar(&maxConnections, "max-connections", 10, "Maximum connections in RDMA pool")
- rootCmd.Flags().DurationVar(&maxIdleTime, "max-idle-time", 5*time.Minute, "Maximum idle time for pooled connections")
- rootCmd.Flags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging")
- if err := rootCmd.Execute(); err != nil {
- fmt.Fprintf(os.Stderr, "Error: %v\n", err)
- os.Exit(1)
- }
- }
- func runServer(cmd *cobra.Command, args []string) error {
- // Setup logging
- logger := logrus.New()
- if debug {
- logger.SetLevel(logrus.DebugLevel)
- logger.SetFormatter(&logrus.TextFormatter{
- FullTimestamp: true,
- ForceColors: true,
- })
- } else {
- logger.SetLevel(logrus.InfoLevel)
- }
- logger.WithFields(logrus.Fields{
- "port": port,
- "rdma_socket": rdmaSocket,
- "volume_server_url": volumeServerURL,
- "enable_rdma": enableRDMA,
- "enable_zerocopy": enableZeroCopy,
- "temp_dir": tempDir,
- "enable_pooling": enablePooling,
- "max_connections": maxConnections,
- "max_idle_time": maxIdleTime,
- "debug": debug,
- }).Info("🚀 Starting SeaweedFS RDMA Demo Server")
- // Create SeaweedFS RDMA client
- config := &seaweedfs.Config{
- RDMASocketPath: rdmaSocket,
- VolumeServerURL: volumeServerURL,
- Enabled: enableRDMA,
- DefaultTimeout: 30 * time.Second,
- Logger: logger,
- TempDir: tempDir,
- UseZeroCopy: enableZeroCopy,
- EnablePooling: enablePooling,
- MaxConnections: maxConnections,
- MaxIdleTime: maxIdleTime,
- }
- rdmaClient, err := seaweedfs.NewSeaweedFSRDMAClient(config)
- if err != nil {
- return fmt.Errorf("failed to create RDMA client: %w", err)
- }
- // Start RDMA client
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- if err := rdmaClient.Start(ctx); err != nil {
- logger.WithError(err).Error("Failed to start RDMA client")
- }
- cancel()
- // Create demo server
- server := &DemoServer{
- rdmaClient: rdmaClient,
- logger: logger,
- }
- // Setup HTTP routes
- mux := http.NewServeMux()
- mux.HandleFunc("/", server.homeHandler)
- mux.HandleFunc("/health", server.healthHandler)
- mux.HandleFunc("/stats", server.statsHandler)
- mux.HandleFunc("/read", server.readHandler)
- mux.HandleFunc("/benchmark", server.benchmarkHandler)
- mux.HandleFunc("/cleanup", server.cleanupHandler)
- httpServer := &http.Server{
- Addr: fmt.Sprintf(":%d", port),
- Handler: mux,
- }
- // Handle graceful shutdown
- sigChan := make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
- go func() {
- logger.WithField("port", port).Info("🌐 Demo server starting")
- if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
- logger.WithError(err).Fatal("HTTP server failed")
- }
- }()
- // Wait for shutdown signal
- <-sigChan
- logger.Info("📡 Received shutdown signal, gracefully shutting down...")
- // Shutdown HTTP server
- shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer shutdownCancel()
- if err := httpServer.Shutdown(shutdownCtx); err != nil {
- logger.WithError(err).Error("HTTP server shutdown failed")
- } else {
- logger.Info("🌐 HTTP server shutdown complete")
- }
- // Stop RDMA client
- rdmaClient.Stop()
- logger.Info("🛑 Demo server shutdown complete")
- return nil
- }
- // DemoServer demonstrates SeaweedFS RDMA integration
- type DemoServer struct {
- rdmaClient *seaweedfs.SeaweedFSRDMAClient
- logger *logrus.Logger
- }
- // homeHandler provides information about the demo server
- func (s *DemoServer) homeHandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodGet {
- http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
- return
- }
- w.Header().Set("Content-Type", "text/html")
- fmt.Fprintf(w, `<!DOCTYPE html>
- <html>
- <head>
- <title>SeaweedFS RDMA Demo Server</title>
- <style>
- body { font-family: Arial, sans-serif; margin: 40px; background-color: #f5f5f5; }
- .container { max-width: 800px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
- h1 { color: #2c3e50; }
- .endpoint { margin: 20px 0; padding: 15px; background: #ecf0f1; border-radius: 4px; }
- .endpoint h3 { margin: 0 0 10px 0; color: #34495e; }
- .endpoint a { color: #3498db; text-decoration: none; }
- .endpoint a:hover { text-decoration: underline; }
- .status { padding: 10px; border-radius: 4px; margin: 10px 0; }
- .status.enabled { background: #d5f4e6; color: #27ae60; }
- .status.disabled { background: #fadbd8; color: #e74c3c; }
- </style>
- </head>
- <body>
- <div class="container">
- <h1>🚀 SeaweedFS RDMA Demo Server</h1>
- <p>This server demonstrates SeaweedFS integration with RDMA acceleration for high-performance reads.</p>
-
- <div class="status %s">
- <strong>RDMA Status:</strong> %s
- </div>
- <h2>📋 Available Endpoints</h2>
-
- <div class="endpoint">
- <h3>🏥 Health Check</h3>
- <p><a href="/health">/health</a> - Check server and RDMA engine health</p>
- </div>
- <div class="endpoint">
- <h3>📊 Statistics</h3>
- <p><a href="/stats">/stats</a> - Get RDMA client statistics and capabilities</p>
- </div>
- <div class="endpoint">
- <h3>📖 Read Needle</h3>
- <p><a href="/read?file_id=3,01637037d6&size=1024&volume_server=http://localhost:8080">/read</a> - Read a needle with RDMA fast path</p>
- <p><strong>Parameters:</strong> file_id OR (volume, needle, cookie), volume_server, offset (optional), size (optional)</p>
- </div>
- <div class="endpoint">
- <h3>🏁 Benchmark</h3>
- <p><a href="/benchmark?iterations=10&size=4096">/benchmark</a> - Run performance benchmark</p>
- <p><strong>Parameters:</strong> iterations (default: 10), size (default: 4096)</p>
- </div>
- <h2>📝 Example Usage</h2>
- <pre>
- # Read a needle using file ID (recommended)
- curl "http://localhost:%d/read?file_id=3,01637037d6&size=1024&volume_server=http://localhost:8080"
- # Read a needle using individual parameters (legacy)
- curl "http://localhost:%d/read?volume=1&needle=12345&cookie=305419896&size=1024&volume_server=http://localhost:8080"
- # Read a needle (hex cookie)
- curl "http://localhost:%d/read?volume=1&needle=12345&cookie=0x12345678&size=1024&volume_server=http://localhost:8080"
- # Run benchmark
- curl "http://localhost:%d/benchmark?iterations=5&size=2048"
- # Check health
- curl "http://localhost:%d/health"
- </pre>
- </div>
- </body>
- </html>`,
- map[bool]string{true: "enabled", false: "disabled"}[s.rdmaClient.IsEnabled()],
- map[bool]string{true: "RDMA Enabled ✅", false: "RDMA Disabled (HTTP Fallback Only) ⚠️"}[s.rdmaClient.IsEnabled()],
- port, port, port, port)
- }
- // healthHandler checks server and RDMA health
- func (s *DemoServer) healthHandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodGet {
- http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
- return
- }
- ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
- defer cancel()
- health := map[string]interface{}{
- "status": "healthy",
- "timestamp": time.Now().Format(time.RFC3339),
- "rdma": map[string]interface{}{
- "enabled": false,
- "connected": false,
- },
- }
- if s.rdmaClient != nil {
- health["rdma"].(map[string]interface{})["enabled"] = s.rdmaClient.IsEnabled()
- health["rdma"].(map[string]interface{})["type"] = "local"
- if s.rdmaClient.IsEnabled() {
- if err := s.rdmaClient.HealthCheck(ctx); err != nil {
- s.logger.WithError(err).Warn("RDMA health check failed")
- health["rdma"].(map[string]interface{})["error"] = err.Error()
- } else {
- health["rdma"].(map[string]interface{})["connected"] = true
- }
- }
- }
- w.Header().Set("Content-Type", "application/json")
- json.NewEncoder(w).Encode(health)
- }
- // statsHandler returns RDMA statistics
- func (s *DemoServer) statsHandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodGet {
- http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
- return
- }
- var stats map[string]interface{}
- if s.rdmaClient != nil {
- stats = s.rdmaClient.GetStats()
- stats["client_type"] = "local"
- } else {
- stats = map[string]interface{}{
- "client_type": "none",
- "error": "no RDMA client available",
- }
- }
- stats["timestamp"] = time.Now().Format(time.RFC3339)
- w.Header().Set("Content-Type", "application/json")
- json.NewEncoder(w).Encode(stats)
- }
- // readHandler demonstrates needle reading with RDMA
- func (s *DemoServer) readHandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodGet {
- http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
- return
- }
- // Parse parameters - support both file_id and individual parameters for backward compatibility
- query := r.URL.Query()
- volumeServer := query.Get("volume_server")
- fileID := query.Get("file_id")
- var volumeID, cookie uint64
- var needleID uint64
- var err error
- if fileID != "" {
- // Use file ID format (e.g., "3,01637037d6")
- // Extract individual components using existing SeaweedFS parsing
- fid, parseErr := needle.ParseFileIdFromString(fileID)
- if parseErr != nil {
- http.Error(w, fmt.Sprintf("invalid 'file_id' parameter: %v", parseErr), http.StatusBadRequest)
- return
- }
- volumeID = uint64(fid.VolumeId)
- needleID = uint64(fid.Key)
- cookie = uint64(fid.Cookie)
- } else {
- // Use individual parameters (backward compatibility)
- volumeID, err = strconv.ParseUint(query.Get("volume"), 10, 32)
- if err != nil {
- http.Error(w, "invalid 'volume' parameter", http.StatusBadRequest)
- return
- }
- needleID, err = strconv.ParseUint(query.Get("needle"), 10, 64)
- if err != nil {
- http.Error(w, "invalid 'needle' parameter", http.StatusBadRequest)
- return
- }
- // Parse cookie parameter - support both decimal and hexadecimal formats
- cookieStr := query.Get("cookie")
- if strings.HasPrefix(strings.ToLower(cookieStr), "0x") {
- // Parse as hexadecimal (remove "0x" prefix)
- cookie, err = strconv.ParseUint(cookieStr[2:], 16, 32)
- } else {
- // Parse as decimal (default)
- cookie, err = strconv.ParseUint(cookieStr, 10, 32)
- }
- if err != nil {
- http.Error(w, "invalid 'cookie' parameter (expected decimal or hex with 0x prefix)", http.StatusBadRequest)
- return
- }
- }
- var offset uint64
- if offsetStr := query.Get("offset"); offsetStr != "" {
- var parseErr error
- offset, parseErr = strconv.ParseUint(offsetStr, 10, 64)
- if parseErr != nil {
- http.Error(w, "invalid 'offset' parameter", http.StatusBadRequest)
- return
- }
- }
- var size uint64
- if sizeStr := query.Get("size"); sizeStr != "" {
- var parseErr error
- size, parseErr = strconv.ParseUint(sizeStr, 10, 64)
- if parseErr != nil {
- http.Error(w, "invalid 'size' parameter", http.StatusBadRequest)
- return
- }
- }
- if volumeServer == "" {
- http.Error(w, "volume_server parameter is required", http.StatusBadRequest)
- return
- }
- if volumeID == 0 || needleID == 0 {
- http.Error(w, "volume and needle parameters are required", http.StatusBadRequest)
- return
- }
- // Note: cookie and size can have defaults for demo purposes when user provides empty values,
- // but invalid parsing is caught above with proper error responses
- if cookie == 0 {
- cookie = 0x12345678 // Default cookie for demo
- }
- if size == 0 {
- size = 4096 // Default size
- }
- logFields := logrus.Fields{
- "volume_server": volumeServer,
- "volume_id": volumeID,
- "needle_id": needleID,
- "cookie": fmt.Sprintf("0x%x", cookie),
- "offset": offset,
- "size": size,
- }
- if fileID != "" {
- logFields["file_id"] = fileID
- }
- s.logger.WithFields(logFields).Info("📖 Processing needle read request")
- ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
- defer cancel()
- start := time.Now()
- req := &seaweedfs.NeedleReadRequest{
- VolumeID: uint32(volumeID),
- NeedleID: needleID,
- Cookie: uint32(cookie),
- Offset: offset,
- Size: size,
- VolumeServer: volumeServer,
- }
- resp, err := s.rdmaClient.ReadNeedle(ctx, req)
- if err != nil {
- s.logger.WithError(err).Error("❌ Needle read failed")
- http.Error(w, fmt.Sprintf("Read failed: %v", err), http.StatusInternalServerError)
- return
- }
- duration := time.Since(start)
- s.logger.WithFields(logrus.Fields{
- "volume_id": volumeID,
- "needle_id": needleID,
- "is_rdma": resp.IsRDMA,
- "source": resp.Source,
- "duration": duration,
- "data_size": len(resp.Data),
- }).Info("✅ Needle read completed")
- // Return metadata and first few bytes
- result := map[string]interface{}{
- "success": true,
- "volume_id": volumeID,
- "needle_id": needleID,
- "cookie": fmt.Sprintf("0x%x", cookie),
- "is_rdma": resp.IsRDMA,
- "source": resp.Source,
- "session_id": resp.SessionID,
- "duration": duration.String(),
- "data_size": len(resp.Data),
- "timestamp": time.Now().Format(time.RFC3339),
- "use_temp_file": resp.UseTempFile,
- "temp_file": resp.TempFilePath,
- }
- // Set headers for zero-copy optimization
- if resp.UseTempFile && resp.TempFilePath != "" {
- w.Header().Set("X-Use-Temp-File", "true")
- w.Header().Set("X-Temp-File", resp.TempFilePath)
- w.Header().Set("X-Source", resp.Source)
- w.Header().Set("X-RDMA-Used", fmt.Sprintf("%t", resp.IsRDMA))
- // For zero-copy, return minimal JSON response and let client read from temp file
- w.Header().Set("Content-Type", "application/json")
- json.NewEncoder(w).Encode(result)
- return
- }
- // Regular response with data
- w.Header().Set("X-Source", resp.Source)
- w.Header().Set("X-RDMA-Used", fmt.Sprintf("%t", resp.IsRDMA))
- // Include first 32 bytes as hex for verification
- if len(resp.Data) > 0 {
- displayLen := 32
- if len(resp.Data) < displayLen {
- displayLen = len(resp.Data)
- }
- result["data_preview"] = fmt.Sprintf("%x", resp.Data[:displayLen])
- }
- w.Header().Set("Content-Type", "application/json")
- json.NewEncoder(w).Encode(result)
- }
- // benchmarkHandler runs performance benchmarks
- func (s *DemoServer) benchmarkHandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodGet {
- http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
- return
- }
- // Parse parameters
- query := r.URL.Query()
- iterations := 10 // default value
- if iterationsStr := query.Get("iterations"); iterationsStr != "" {
- var parseErr error
- iterations, parseErr = strconv.Atoi(iterationsStr)
- if parseErr != nil {
- http.Error(w, "invalid 'iterations' parameter", http.StatusBadRequest)
- return
- }
- }
- size := uint64(4096) // default value
- if sizeStr := query.Get("size"); sizeStr != "" {
- var parseErr error
- size, parseErr = strconv.ParseUint(sizeStr, 10, 64)
- if parseErr != nil {
- http.Error(w, "invalid 'size' parameter", http.StatusBadRequest)
- return
- }
- }
- if iterations <= 0 {
- iterations = 10
- }
- if size == 0 {
- size = 4096
- }
- s.logger.WithFields(logrus.Fields{
- "iterations": iterations,
- "size": size,
- }).Info("🏁 Starting benchmark")
- ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
- defer cancel()
- var rdmaSuccessful, rdmaFailed, httpSuccessful, httpFailed int
- var totalDuration time.Duration
- var totalBytes uint64
- startTime := time.Now()
- for i := 0; i < iterations; i++ {
- req := &seaweedfs.NeedleReadRequest{
- VolumeID: 1,
- NeedleID: uint64(i + 1),
- Cookie: 0x12345678,
- Offset: 0,
- Size: size,
- }
- opStart := time.Now()
- resp, err := s.rdmaClient.ReadNeedle(ctx, req)
- opDuration := time.Since(opStart)
- if err != nil {
- httpFailed++
- continue
- }
- totalDuration += opDuration
- totalBytes += uint64(len(resp.Data))
- if resp.IsRDMA {
- rdmaSuccessful++
- } else {
- httpSuccessful++
- }
- }
- benchDuration := time.Since(startTime)
- // Calculate statistics
- totalOperations := rdmaSuccessful + httpSuccessful
- avgLatency := time.Duration(0)
- if totalOperations > 0 {
- avgLatency = totalDuration / time.Duration(totalOperations)
- }
- throughputMBps := float64(totalBytes) / benchDuration.Seconds() / (1024 * 1024)
- opsPerSec := float64(totalOperations) / benchDuration.Seconds()
- result := map[string]interface{}{
- "benchmark_results": map[string]interface{}{
- "iterations": iterations,
- "size_per_op": size,
- "total_duration": benchDuration.String(),
- "successful_ops": totalOperations,
- "failed_ops": rdmaFailed + httpFailed,
- "rdma_ops": rdmaSuccessful,
- "http_ops": httpSuccessful,
- "avg_latency": avgLatency.String(),
- "throughput_mbps": fmt.Sprintf("%.2f", throughputMBps),
- "ops_per_sec": fmt.Sprintf("%.1f", opsPerSec),
- "total_bytes": totalBytes,
- },
- "rdma_enabled": s.rdmaClient.IsEnabled(),
- "timestamp": time.Now().Format(time.RFC3339),
- }
- s.logger.WithFields(logrus.Fields{
- "iterations": iterations,
- "successful_ops": totalOperations,
- "rdma_ops": rdmaSuccessful,
- "http_ops": httpSuccessful,
- "avg_latency": avgLatency,
- "throughput_mbps": throughputMBps,
- "ops_per_sec": opsPerSec,
- }).Info("📊 Benchmark completed")
- w.Header().Set("Content-Type", "application/json")
- json.NewEncoder(w).Encode(result)
- }
- // cleanupHandler handles temp file cleanup requests from mount clients
- func (s *DemoServer) cleanupHandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodDelete {
- http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
- return
- }
- // Get temp file path from query parameters
- tempFilePath := r.URL.Query().Get("temp_file")
- if tempFilePath == "" {
- http.Error(w, "missing 'temp_file' parameter", http.StatusBadRequest)
- return
- }
- s.logger.WithField("temp_file", tempFilePath).Debug("🗑️ Processing cleanup request")
- // Use the RDMA client's cleanup method (which delegates to seaweedfs client)
- err := s.rdmaClient.CleanupTempFile(tempFilePath)
- if err != nil {
- s.logger.WithError(err).WithField("temp_file", tempFilePath).Warn("Failed to cleanup temp file")
- http.Error(w, fmt.Sprintf("cleanup failed: %v", err), http.StatusInternalServerError)
- return
- }
- s.logger.WithField("temp_file", tempFilePath).Debug("🧹 Temp file cleanup successful")
- // Return success response
- w.Header().Set("Content-Type", "application/json")
- response := map[string]interface{}{
- "success": true,
- "message": "temp file cleaned up successfully",
- "temp_file": tempFilePath,
- "timestamp": time.Now().Format(time.RFC3339),
- }
- json.NewEncoder(w).Encode(response)
- }
|