main.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. // Package main provides a demonstration server showing SeaweedFS RDMA integration
  2. package main
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "os"
  9. "os/signal"
  10. "strconv"
  11. "strings"
  12. "syscall"
  13. "time"
  14. "seaweedfs-rdma-sidecar/pkg/seaweedfs"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  16. "github.com/sirupsen/logrus"
  17. "github.com/spf13/cobra"
  18. )
  19. var (
  20. port int
  21. rdmaSocket string
  22. volumeServerURL string
  23. enableRDMA bool
  24. enableZeroCopy bool
  25. tempDir string
  26. enablePooling bool
  27. maxConnections int
  28. maxIdleTime time.Duration
  29. debug bool
  30. )
  31. func main() {
  32. var rootCmd = &cobra.Command{
  33. Use: "demo-server",
  34. Short: "SeaweedFS RDMA integration demonstration server",
  35. Long: `Demonstration server that shows how SeaweedFS can integrate with the RDMA sidecar
  36. for accelerated read operations. This server provides HTTP endpoints that demonstrate
  37. the RDMA fast path with HTTP fallback capabilities.`,
  38. RunE: runServer,
  39. }
  40. rootCmd.Flags().IntVarP(&port, "port", "p", 8080, "Demo server HTTP port")
  41. rootCmd.Flags().StringVarP(&rdmaSocket, "rdma-socket", "r", "/tmp/rdma-engine.sock", "Path to RDMA engine Unix socket")
  42. rootCmd.Flags().StringVarP(&volumeServerURL, "volume-server", "v", "http://localhost:8080", "SeaweedFS volume server URL for HTTP fallback")
  43. rootCmd.Flags().BoolVarP(&enableRDMA, "enable-rdma", "e", true, "Enable RDMA acceleration")
  44. rootCmd.Flags().BoolVarP(&enableZeroCopy, "enable-zerocopy", "z", true, "Enable zero-copy optimization via temp files")
  45. rootCmd.Flags().StringVarP(&tempDir, "temp-dir", "t", "/tmp/rdma-cache", "Temp directory for zero-copy files")
  46. rootCmd.Flags().BoolVar(&enablePooling, "enable-pooling", true, "Enable RDMA connection pooling")
  47. rootCmd.Flags().IntVar(&maxConnections, "max-connections", 10, "Maximum connections in RDMA pool")
  48. rootCmd.Flags().DurationVar(&maxIdleTime, "max-idle-time", 5*time.Minute, "Maximum idle time for pooled connections")
  49. rootCmd.Flags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging")
  50. if err := rootCmd.Execute(); err != nil {
  51. fmt.Fprintf(os.Stderr, "Error: %v\n", err)
  52. os.Exit(1)
  53. }
  54. }
  55. func runServer(cmd *cobra.Command, args []string) error {
  56. // Setup logging
  57. logger := logrus.New()
  58. if debug {
  59. logger.SetLevel(logrus.DebugLevel)
  60. logger.SetFormatter(&logrus.TextFormatter{
  61. FullTimestamp: true,
  62. ForceColors: true,
  63. })
  64. } else {
  65. logger.SetLevel(logrus.InfoLevel)
  66. }
  67. logger.WithFields(logrus.Fields{
  68. "port": port,
  69. "rdma_socket": rdmaSocket,
  70. "volume_server_url": volumeServerURL,
  71. "enable_rdma": enableRDMA,
  72. "enable_zerocopy": enableZeroCopy,
  73. "temp_dir": tempDir,
  74. "enable_pooling": enablePooling,
  75. "max_connections": maxConnections,
  76. "max_idle_time": maxIdleTime,
  77. "debug": debug,
  78. }).Info("🚀 Starting SeaweedFS RDMA Demo Server")
  79. // Create SeaweedFS RDMA client
  80. config := &seaweedfs.Config{
  81. RDMASocketPath: rdmaSocket,
  82. VolumeServerURL: volumeServerURL,
  83. Enabled: enableRDMA,
  84. DefaultTimeout: 30 * time.Second,
  85. Logger: logger,
  86. TempDir: tempDir,
  87. UseZeroCopy: enableZeroCopy,
  88. EnablePooling: enablePooling,
  89. MaxConnections: maxConnections,
  90. MaxIdleTime: maxIdleTime,
  91. }
  92. rdmaClient, err := seaweedfs.NewSeaweedFSRDMAClient(config)
  93. if err != nil {
  94. return fmt.Errorf("failed to create RDMA client: %w", err)
  95. }
  96. // Start RDMA client
  97. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  98. if err := rdmaClient.Start(ctx); err != nil {
  99. logger.WithError(err).Error("Failed to start RDMA client")
  100. }
  101. cancel()
  102. // Create demo server
  103. server := &DemoServer{
  104. rdmaClient: rdmaClient,
  105. logger: logger,
  106. }
  107. // Setup HTTP routes
  108. mux := http.NewServeMux()
  109. mux.HandleFunc("/", server.homeHandler)
  110. mux.HandleFunc("/health", server.healthHandler)
  111. mux.HandleFunc("/stats", server.statsHandler)
  112. mux.HandleFunc("/read", server.readHandler)
  113. mux.HandleFunc("/benchmark", server.benchmarkHandler)
  114. mux.HandleFunc("/cleanup", server.cleanupHandler)
  115. httpServer := &http.Server{
  116. Addr: fmt.Sprintf(":%d", port),
  117. Handler: mux,
  118. }
  119. // Handle graceful shutdown
  120. sigChan := make(chan os.Signal, 1)
  121. signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  122. go func() {
  123. logger.WithField("port", port).Info("🌐 Demo server starting")
  124. if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  125. logger.WithError(err).Fatal("HTTP server failed")
  126. }
  127. }()
  128. // Wait for shutdown signal
  129. <-sigChan
  130. logger.Info("📡 Received shutdown signal, gracefully shutting down...")
  131. // Shutdown HTTP server
  132. shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
  133. defer shutdownCancel()
  134. if err := httpServer.Shutdown(shutdownCtx); err != nil {
  135. logger.WithError(err).Error("HTTP server shutdown failed")
  136. } else {
  137. logger.Info("🌐 HTTP server shutdown complete")
  138. }
  139. // Stop RDMA client
  140. rdmaClient.Stop()
  141. logger.Info("🛑 Demo server shutdown complete")
  142. return nil
  143. }
  144. // DemoServer demonstrates SeaweedFS RDMA integration
  145. type DemoServer struct {
  146. rdmaClient *seaweedfs.SeaweedFSRDMAClient
  147. logger *logrus.Logger
  148. }
  149. // homeHandler provides information about the demo server
  150. func (s *DemoServer) homeHandler(w http.ResponseWriter, r *http.Request) {
  151. if r.Method != http.MethodGet {
  152. http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
  153. return
  154. }
  155. w.Header().Set("Content-Type", "text/html")
  156. fmt.Fprintf(w, `<!DOCTYPE html>
  157. <html>
  158. <head>
  159. <title>SeaweedFS RDMA Demo Server</title>
  160. <style>
  161. body { font-family: Arial, sans-serif; margin: 40px; background-color: #f5f5f5; }
  162. .container { max-width: 800px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
  163. h1 { color: #2c3e50; }
  164. .endpoint { margin: 20px 0; padding: 15px; background: #ecf0f1; border-radius: 4px; }
  165. .endpoint h3 { margin: 0 0 10px 0; color: #34495e; }
  166. .endpoint a { color: #3498db; text-decoration: none; }
  167. .endpoint a:hover { text-decoration: underline; }
  168. .status { padding: 10px; border-radius: 4px; margin: 10px 0; }
  169. .status.enabled { background: #d5f4e6; color: #27ae60; }
  170. .status.disabled { background: #fadbd8; color: #e74c3c; }
  171. </style>
  172. </head>
  173. <body>
  174. <div class="container">
  175. <h1>🚀 SeaweedFS RDMA Demo Server</h1>
  176. <p>This server demonstrates SeaweedFS integration with RDMA acceleration for high-performance reads.</p>
  177. <div class="status %s">
  178. <strong>RDMA Status:</strong> %s
  179. </div>
  180. <h2>📋 Available Endpoints</h2>
  181. <div class="endpoint">
  182. <h3>🏥 Health Check</h3>
  183. <p><a href="/health">/health</a> - Check server and RDMA engine health</p>
  184. </div>
  185. <div class="endpoint">
  186. <h3>📊 Statistics</h3>
  187. <p><a href="/stats">/stats</a> - Get RDMA client statistics and capabilities</p>
  188. </div>
  189. <div class="endpoint">
  190. <h3>📖 Read Needle</h3>
  191. <p><a href="/read?file_id=3,01637037d6&size=1024&volume_server=http://localhost:8080">/read</a> - Read a needle with RDMA fast path</p>
  192. <p><strong>Parameters:</strong> file_id OR (volume, needle, cookie), volume_server, offset (optional), size (optional)</p>
  193. </div>
  194. <div class="endpoint">
  195. <h3>🏁 Benchmark</h3>
  196. <p><a href="/benchmark?iterations=10&size=4096">/benchmark</a> - Run performance benchmark</p>
  197. <p><strong>Parameters:</strong> iterations (default: 10), size (default: 4096)</p>
  198. </div>
  199. <h2>📝 Example Usage</h2>
  200. <pre>
  201. # Read a needle using file ID (recommended)
  202. curl "http://localhost:%d/read?file_id=3,01637037d6&size=1024&volume_server=http://localhost:8080"
  203. # Read a needle using individual parameters (legacy)
  204. curl "http://localhost:%d/read?volume=1&needle=12345&cookie=305419896&size=1024&volume_server=http://localhost:8080"
  205. # Read a needle (hex cookie)
  206. curl "http://localhost:%d/read?volume=1&needle=12345&cookie=0x12345678&size=1024&volume_server=http://localhost:8080"
  207. # Run benchmark
  208. curl "http://localhost:%d/benchmark?iterations=5&size=2048"
  209. # Check health
  210. curl "http://localhost:%d/health"
  211. </pre>
  212. </div>
  213. </body>
  214. </html>`,
  215. map[bool]string{true: "enabled", false: "disabled"}[s.rdmaClient.IsEnabled()],
  216. map[bool]string{true: "RDMA Enabled ✅", false: "RDMA Disabled (HTTP Fallback Only) ⚠️"}[s.rdmaClient.IsEnabled()],
  217. port, port, port, port)
  218. }
  219. // healthHandler checks server and RDMA health
  220. func (s *DemoServer) healthHandler(w http.ResponseWriter, r *http.Request) {
  221. if r.Method != http.MethodGet {
  222. http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
  223. return
  224. }
  225. ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
  226. defer cancel()
  227. health := map[string]interface{}{
  228. "status": "healthy",
  229. "timestamp": time.Now().Format(time.RFC3339),
  230. "rdma": map[string]interface{}{
  231. "enabled": false,
  232. "connected": false,
  233. },
  234. }
  235. if s.rdmaClient != nil {
  236. health["rdma"].(map[string]interface{})["enabled"] = s.rdmaClient.IsEnabled()
  237. health["rdma"].(map[string]interface{})["type"] = "local"
  238. if s.rdmaClient.IsEnabled() {
  239. if err := s.rdmaClient.HealthCheck(ctx); err != nil {
  240. s.logger.WithError(err).Warn("RDMA health check failed")
  241. health["rdma"].(map[string]interface{})["error"] = err.Error()
  242. } else {
  243. health["rdma"].(map[string]interface{})["connected"] = true
  244. }
  245. }
  246. }
  247. w.Header().Set("Content-Type", "application/json")
  248. json.NewEncoder(w).Encode(health)
  249. }
  250. // statsHandler returns RDMA statistics
  251. func (s *DemoServer) statsHandler(w http.ResponseWriter, r *http.Request) {
  252. if r.Method != http.MethodGet {
  253. http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
  254. return
  255. }
  256. var stats map[string]interface{}
  257. if s.rdmaClient != nil {
  258. stats = s.rdmaClient.GetStats()
  259. stats["client_type"] = "local"
  260. } else {
  261. stats = map[string]interface{}{
  262. "client_type": "none",
  263. "error": "no RDMA client available",
  264. }
  265. }
  266. stats["timestamp"] = time.Now().Format(time.RFC3339)
  267. w.Header().Set("Content-Type", "application/json")
  268. json.NewEncoder(w).Encode(stats)
  269. }
  270. // readHandler demonstrates needle reading with RDMA
  271. func (s *DemoServer) readHandler(w http.ResponseWriter, r *http.Request) {
  272. if r.Method != http.MethodGet {
  273. http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
  274. return
  275. }
  276. // Parse parameters - support both file_id and individual parameters for backward compatibility
  277. query := r.URL.Query()
  278. volumeServer := query.Get("volume_server")
  279. fileID := query.Get("file_id")
  280. var volumeID, cookie uint64
  281. var needleID uint64
  282. var err error
  283. if fileID != "" {
  284. // Use file ID format (e.g., "3,01637037d6")
  285. // Extract individual components using existing SeaweedFS parsing
  286. fid, parseErr := needle.ParseFileIdFromString(fileID)
  287. if parseErr != nil {
  288. http.Error(w, fmt.Sprintf("invalid 'file_id' parameter: %v", parseErr), http.StatusBadRequest)
  289. return
  290. }
  291. volumeID = uint64(fid.VolumeId)
  292. needleID = uint64(fid.Key)
  293. cookie = uint64(fid.Cookie)
  294. } else {
  295. // Use individual parameters (backward compatibility)
  296. volumeID, err = strconv.ParseUint(query.Get("volume"), 10, 32)
  297. if err != nil {
  298. http.Error(w, "invalid 'volume' parameter", http.StatusBadRequest)
  299. return
  300. }
  301. needleID, err = strconv.ParseUint(query.Get("needle"), 10, 64)
  302. if err != nil {
  303. http.Error(w, "invalid 'needle' parameter", http.StatusBadRequest)
  304. return
  305. }
  306. // Parse cookie parameter - support both decimal and hexadecimal formats
  307. cookieStr := query.Get("cookie")
  308. if strings.HasPrefix(strings.ToLower(cookieStr), "0x") {
  309. // Parse as hexadecimal (remove "0x" prefix)
  310. cookie, err = strconv.ParseUint(cookieStr[2:], 16, 32)
  311. } else {
  312. // Parse as decimal (default)
  313. cookie, err = strconv.ParseUint(cookieStr, 10, 32)
  314. }
  315. if err != nil {
  316. http.Error(w, "invalid 'cookie' parameter (expected decimal or hex with 0x prefix)", http.StatusBadRequest)
  317. return
  318. }
  319. }
  320. var offset uint64
  321. if offsetStr := query.Get("offset"); offsetStr != "" {
  322. var parseErr error
  323. offset, parseErr = strconv.ParseUint(offsetStr, 10, 64)
  324. if parseErr != nil {
  325. http.Error(w, "invalid 'offset' parameter", http.StatusBadRequest)
  326. return
  327. }
  328. }
  329. var size uint64
  330. if sizeStr := query.Get("size"); sizeStr != "" {
  331. var parseErr error
  332. size, parseErr = strconv.ParseUint(sizeStr, 10, 64)
  333. if parseErr != nil {
  334. http.Error(w, "invalid 'size' parameter", http.StatusBadRequest)
  335. return
  336. }
  337. }
  338. if volumeServer == "" {
  339. http.Error(w, "volume_server parameter is required", http.StatusBadRequest)
  340. return
  341. }
  342. if volumeID == 0 || needleID == 0 {
  343. http.Error(w, "volume and needle parameters are required", http.StatusBadRequest)
  344. return
  345. }
  346. // Note: cookie and size can have defaults for demo purposes when user provides empty values,
  347. // but invalid parsing is caught above with proper error responses
  348. if cookie == 0 {
  349. cookie = 0x12345678 // Default cookie for demo
  350. }
  351. if size == 0 {
  352. size = 4096 // Default size
  353. }
  354. logFields := logrus.Fields{
  355. "volume_server": volumeServer,
  356. "volume_id": volumeID,
  357. "needle_id": needleID,
  358. "cookie": fmt.Sprintf("0x%x", cookie),
  359. "offset": offset,
  360. "size": size,
  361. }
  362. if fileID != "" {
  363. logFields["file_id"] = fileID
  364. }
  365. s.logger.WithFields(logFields).Info("📖 Processing needle read request")
  366. ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
  367. defer cancel()
  368. start := time.Now()
  369. req := &seaweedfs.NeedleReadRequest{
  370. VolumeID: uint32(volumeID),
  371. NeedleID: needleID,
  372. Cookie: uint32(cookie),
  373. Offset: offset,
  374. Size: size,
  375. VolumeServer: volumeServer,
  376. }
  377. resp, err := s.rdmaClient.ReadNeedle(ctx, req)
  378. if err != nil {
  379. s.logger.WithError(err).Error("❌ Needle read failed")
  380. http.Error(w, fmt.Sprintf("Read failed: %v", err), http.StatusInternalServerError)
  381. return
  382. }
  383. duration := time.Since(start)
  384. s.logger.WithFields(logrus.Fields{
  385. "volume_id": volumeID,
  386. "needle_id": needleID,
  387. "is_rdma": resp.IsRDMA,
  388. "source": resp.Source,
  389. "duration": duration,
  390. "data_size": len(resp.Data),
  391. }).Info("✅ Needle read completed")
  392. // Return metadata and first few bytes
  393. result := map[string]interface{}{
  394. "success": true,
  395. "volume_id": volumeID,
  396. "needle_id": needleID,
  397. "cookie": fmt.Sprintf("0x%x", cookie),
  398. "is_rdma": resp.IsRDMA,
  399. "source": resp.Source,
  400. "session_id": resp.SessionID,
  401. "duration": duration.String(),
  402. "data_size": len(resp.Data),
  403. "timestamp": time.Now().Format(time.RFC3339),
  404. "use_temp_file": resp.UseTempFile,
  405. "temp_file": resp.TempFilePath,
  406. }
  407. // Set headers for zero-copy optimization
  408. if resp.UseTempFile && resp.TempFilePath != "" {
  409. w.Header().Set("X-Use-Temp-File", "true")
  410. w.Header().Set("X-Temp-File", resp.TempFilePath)
  411. w.Header().Set("X-Source", resp.Source)
  412. w.Header().Set("X-RDMA-Used", fmt.Sprintf("%t", resp.IsRDMA))
  413. // For zero-copy, return minimal JSON response and let client read from temp file
  414. w.Header().Set("Content-Type", "application/json")
  415. json.NewEncoder(w).Encode(result)
  416. return
  417. }
  418. // Regular response with data
  419. w.Header().Set("X-Source", resp.Source)
  420. w.Header().Set("X-RDMA-Used", fmt.Sprintf("%t", resp.IsRDMA))
  421. // Include first 32 bytes as hex for verification
  422. if len(resp.Data) > 0 {
  423. displayLen := 32
  424. if len(resp.Data) < displayLen {
  425. displayLen = len(resp.Data)
  426. }
  427. result["data_preview"] = fmt.Sprintf("%x", resp.Data[:displayLen])
  428. }
  429. w.Header().Set("Content-Type", "application/json")
  430. json.NewEncoder(w).Encode(result)
  431. }
  432. // benchmarkHandler runs performance benchmarks
  433. func (s *DemoServer) benchmarkHandler(w http.ResponseWriter, r *http.Request) {
  434. if r.Method != http.MethodGet {
  435. http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
  436. return
  437. }
  438. // Parse parameters
  439. query := r.URL.Query()
  440. iterations := 10 // default value
  441. if iterationsStr := query.Get("iterations"); iterationsStr != "" {
  442. var parseErr error
  443. iterations, parseErr = strconv.Atoi(iterationsStr)
  444. if parseErr != nil {
  445. http.Error(w, "invalid 'iterations' parameter", http.StatusBadRequest)
  446. return
  447. }
  448. }
  449. size := uint64(4096) // default value
  450. if sizeStr := query.Get("size"); sizeStr != "" {
  451. var parseErr error
  452. size, parseErr = strconv.ParseUint(sizeStr, 10, 64)
  453. if parseErr != nil {
  454. http.Error(w, "invalid 'size' parameter", http.StatusBadRequest)
  455. return
  456. }
  457. }
  458. if iterations <= 0 {
  459. iterations = 10
  460. }
  461. if size == 0 {
  462. size = 4096
  463. }
  464. s.logger.WithFields(logrus.Fields{
  465. "iterations": iterations,
  466. "size": size,
  467. }).Info("🏁 Starting benchmark")
  468. ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
  469. defer cancel()
  470. var rdmaSuccessful, rdmaFailed, httpSuccessful, httpFailed int
  471. var totalDuration time.Duration
  472. var totalBytes uint64
  473. startTime := time.Now()
  474. for i := 0; i < iterations; i++ {
  475. req := &seaweedfs.NeedleReadRequest{
  476. VolumeID: 1,
  477. NeedleID: uint64(i + 1),
  478. Cookie: 0x12345678,
  479. Offset: 0,
  480. Size: size,
  481. }
  482. opStart := time.Now()
  483. resp, err := s.rdmaClient.ReadNeedle(ctx, req)
  484. opDuration := time.Since(opStart)
  485. if err != nil {
  486. httpFailed++
  487. continue
  488. }
  489. totalDuration += opDuration
  490. totalBytes += uint64(len(resp.Data))
  491. if resp.IsRDMA {
  492. rdmaSuccessful++
  493. } else {
  494. httpSuccessful++
  495. }
  496. }
  497. benchDuration := time.Since(startTime)
  498. // Calculate statistics
  499. totalOperations := rdmaSuccessful + httpSuccessful
  500. avgLatency := time.Duration(0)
  501. if totalOperations > 0 {
  502. avgLatency = totalDuration / time.Duration(totalOperations)
  503. }
  504. throughputMBps := float64(totalBytes) / benchDuration.Seconds() / (1024 * 1024)
  505. opsPerSec := float64(totalOperations) / benchDuration.Seconds()
  506. result := map[string]interface{}{
  507. "benchmark_results": map[string]interface{}{
  508. "iterations": iterations,
  509. "size_per_op": size,
  510. "total_duration": benchDuration.String(),
  511. "successful_ops": totalOperations,
  512. "failed_ops": rdmaFailed + httpFailed,
  513. "rdma_ops": rdmaSuccessful,
  514. "http_ops": httpSuccessful,
  515. "avg_latency": avgLatency.String(),
  516. "throughput_mbps": fmt.Sprintf("%.2f", throughputMBps),
  517. "ops_per_sec": fmt.Sprintf("%.1f", opsPerSec),
  518. "total_bytes": totalBytes,
  519. },
  520. "rdma_enabled": s.rdmaClient.IsEnabled(),
  521. "timestamp": time.Now().Format(time.RFC3339),
  522. }
  523. s.logger.WithFields(logrus.Fields{
  524. "iterations": iterations,
  525. "successful_ops": totalOperations,
  526. "rdma_ops": rdmaSuccessful,
  527. "http_ops": httpSuccessful,
  528. "avg_latency": avgLatency,
  529. "throughput_mbps": throughputMBps,
  530. "ops_per_sec": opsPerSec,
  531. }).Info("📊 Benchmark completed")
  532. w.Header().Set("Content-Type", "application/json")
  533. json.NewEncoder(w).Encode(result)
  534. }
  535. // cleanupHandler handles temp file cleanup requests from mount clients
  536. func (s *DemoServer) cleanupHandler(w http.ResponseWriter, r *http.Request) {
  537. if r.Method != http.MethodDelete {
  538. http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
  539. return
  540. }
  541. // Get temp file path from query parameters
  542. tempFilePath := r.URL.Query().Get("temp_file")
  543. if tempFilePath == "" {
  544. http.Error(w, "missing 'temp_file' parameter", http.StatusBadRequest)
  545. return
  546. }
  547. s.logger.WithField("temp_file", tempFilePath).Debug("🗑️ Processing cleanup request")
  548. // Use the RDMA client's cleanup method (which delegates to seaweedfs client)
  549. err := s.rdmaClient.CleanupTempFile(tempFilePath)
  550. if err != nil {
  551. s.logger.WithError(err).WithField("temp_file", tempFilePath).Warn("Failed to cleanup temp file")
  552. http.Error(w, fmt.Sprintf("cleanup failed: %v", err), http.StatusInternalServerError)
  553. return
  554. }
  555. s.logger.WithField("temp_file", tempFilePath).Debug("🧹 Temp file cleanup successful")
  556. // Return success response
  557. w.Header().Set("Content-Type", "application/json")
  558. response := map[string]interface{}{
  559. "success": true,
  560. "message": "temp file cleaned up successfully",
  561. "temp_file": tempFilePath,
  562. "timestamp": time.Now().Format(time.RFC3339),
  563. }
  564. json.NewEncoder(w).Encode(response)
  565. }