main.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. // Package main provides the main RDMA sidecar service that integrates with SeaweedFS
  2. package main
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "net/http"
  8. "os"
  9. "os/signal"
  10. "strconv"
  11. "syscall"
  12. "time"
  13. "seaweedfs-rdma-sidecar/pkg/rdma"
  14. "github.com/sirupsen/logrus"
  15. "github.com/spf13/cobra"
  16. )
  17. var (
  18. port int
  19. engineSocket string
  20. debug bool
  21. timeout time.Duration
  22. )
  23. // Response structs for JSON encoding
  24. type HealthResponse struct {
  25. Status string `json:"status"`
  26. RdmaEngineConnected bool `json:"rdma_engine_connected"`
  27. RdmaEngineLatency string `json:"rdma_engine_latency"`
  28. Timestamp string `json:"timestamp"`
  29. }
  30. type CapabilitiesResponse struct {
  31. Version string `json:"version"`
  32. DeviceName string `json:"device_name"`
  33. VendorId uint32 `json:"vendor_id"`
  34. MaxSessions uint32 `json:"max_sessions"`
  35. MaxTransferSize uint64 `json:"max_transfer_size"`
  36. ActiveSessions uint32 `json:"active_sessions"`
  37. RealRdma bool `json:"real_rdma"`
  38. PortGid string `json:"port_gid"`
  39. PortLid uint16 `json:"port_lid"`
  40. SupportedAuth []string `json:"supported_auth"`
  41. }
  42. type PingResponse struct {
  43. Success bool `json:"success"`
  44. EngineLatency string `json:"engine_latency"`
  45. TotalLatency string `json:"total_latency"`
  46. Timestamp string `json:"timestamp"`
  47. }
  48. func main() {
  49. var rootCmd = &cobra.Command{
  50. Use: "rdma-sidecar",
  51. Short: "SeaweedFS RDMA acceleration sidecar",
  52. Long: `RDMA sidecar that accelerates SeaweedFS read/write operations using UCX and Rust RDMA engine.
  53. This sidecar acts as a bridge between SeaweedFS volume servers and the high-performance
  54. Rust RDMA engine, providing significant performance improvements for data-intensive workloads.`,
  55. RunE: runSidecar,
  56. }
  57. // Flags
  58. rootCmd.Flags().IntVarP(&port, "port", "p", 8081, "HTTP server port")
  59. rootCmd.Flags().StringVarP(&engineSocket, "engine-socket", "e", "/tmp/rdma-engine.sock", "Path to RDMA engine Unix socket")
  60. rootCmd.Flags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging")
  61. rootCmd.Flags().DurationVarP(&timeout, "timeout", "t", 30*time.Second, "RDMA operation timeout")
  62. if err := rootCmd.Execute(); err != nil {
  63. fmt.Fprintf(os.Stderr, "Error: %v\n", err)
  64. os.Exit(1)
  65. }
  66. }
  67. func runSidecar(cmd *cobra.Command, args []string) error {
  68. // Setup logging
  69. logger := logrus.New()
  70. if debug {
  71. logger.SetLevel(logrus.DebugLevel)
  72. logger.SetFormatter(&logrus.TextFormatter{
  73. FullTimestamp: true,
  74. ForceColors: true,
  75. })
  76. } else {
  77. logger.SetLevel(logrus.InfoLevel)
  78. }
  79. logger.WithFields(logrus.Fields{
  80. "port": port,
  81. "engine_socket": engineSocket,
  82. "debug": debug,
  83. "timeout": timeout,
  84. }).Info("🚀 Starting SeaweedFS RDMA Sidecar")
  85. // Create RDMA client
  86. rdmaConfig := &rdma.Config{
  87. EngineSocketPath: engineSocket,
  88. DefaultTimeout: timeout,
  89. Logger: logger,
  90. }
  91. rdmaClient := rdma.NewClient(rdmaConfig)
  92. // Connect to RDMA engine
  93. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  94. defer cancel()
  95. logger.Info("🔗 Connecting to RDMA engine...")
  96. if err := rdmaClient.Connect(ctx); err != nil {
  97. return fmt.Errorf("failed to connect to RDMA engine: %w", err)
  98. }
  99. logger.Info("✅ Connected to RDMA engine successfully")
  100. // Create HTTP server
  101. sidecar := &Sidecar{
  102. rdmaClient: rdmaClient,
  103. logger: logger,
  104. }
  105. mux := http.NewServeMux()
  106. // Health check endpoint
  107. mux.HandleFunc("/health", sidecar.healthHandler)
  108. // RDMA operations endpoints
  109. mux.HandleFunc("/rdma/read", sidecar.rdmaReadHandler)
  110. mux.HandleFunc("/rdma/capabilities", sidecar.capabilitiesHandler)
  111. mux.HandleFunc("/rdma/ping", sidecar.pingHandler)
  112. server := &http.Server{
  113. Addr: fmt.Sprintf(":%d", port),
  114. Handler: mux,
  115. }
  116. // Handle graceful shutdown
  117. sigChan := make(chan os.Signal, 1)
  118. signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  119. go func() {
  120. logger.WithField("port", port).Info("🌐 HTTP server starting")
  121. if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  122. logger.WithError(err).Fatal("HTTP server failed")
  123. }
  124. }()
  125. // Wait for shutdown signal
  126. <-sigChan
  127. logger.Info("📡 Received shutdown signal, gracefully shutting down...")
  128. // Shutdown HTTP server
  129. shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
  130. defer shutdownCancel()
  131. if err := server.Shutdown(shutdownCtx); err != nil {
  132. logger.WithError(err).Error("HTTP server shutdown failed")
  133. } else {
  134. logger.Info("🌐 HTTP server shutdown complete")
  135. }
  136. // Disconnect from RDMA engine
  137. rdmaClient.Disconnect()
  138. logger.Info("🛑 RDMA sidecar shutdown complete")
  139. return nil
  140. }
  141. // Sidecar represents the main sidecar service
  142. type Sidecar struct {
  143. rdmaClient *rdma.Client
  144. logger *logrus.Logger
  145. }
  146. // Health check handler
  147. func (s *Sidecar) healthHandler(w http.ResponseWriter, r *http.Request) {
  148. if r.Method != http.MethodGet {
  149. http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
  150. return
  151. }
  152. ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
  153. defer cancel()
  154. // Test RDMA engine connectivity
  155. if !s.rdmaClient.IsConnected() {
  156. s.logger.Warn("⚠️ RDMA engine not connected")
  157. http.Error(w, "RDMA engine not connected", http.StatusServiceUnavailable)
  158. return
  159. }
  160. // Ping RDMA engine
  161. latency, err := s.rdmaClient.Ping(ctx)
  162. if err != nil {
  163. s.logger.WithError(err).Error("❌ RDMA engine ping failed")
  164. http.Error(w, "RDMA engine ping failed", http.StatusServiceUnavailable)
  165. return
  166. }
  167. w.Header().Set("Content-Type", "application/json")
  168. response := HealthResponse{
  169. Status: "healthy",
  170. RdmaEngineConnected: true,
  171. RdmaEngineLatency: latency.String(),
  172. Timestamp: time.Now().Format(time.RFC3339),
  173. }
  174. json.NewEncoder(w).Encode(response)
  175. }
  176. // RDMA capabilities handler
  177. func (s *Sidecar) capabilitiesHandler(w http.ResponseWriter, r *http.Request) {
  178. if r.Method != http.MethodGet {
  179. http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
  180. return
  181. }
  182. caps := s.rdmaClient.GetCapabilities()
  183. if caps == nil {
  184. http.Error(w, "No capabilities available", http.StatusServiceUnavailable)
  185. return
  186. }
  187. w.Header().Set("Content-Type", "application/json")
  188. response := CapabilitiesResponse{
  189. Version: caps.Version,
  190. DeviceName: caps.DeviceName,
  191. VendorId: caps.VendorId,
  192. MaxSessions: uint32(caps.MaxSessions),
  193. MaxTransferSize: caps.MaxTransferSize,
  194. ActiveSessions: uint32(caps.ActiveSessions),
  195. RealRdma: caps.RealRdma,
  196. PortGid: caps.PortGid,
  197. PortLid: caps.PortLid,
  198. SupportedAuth: caps.SupportedAuth,
  199. }
  200. json.NewEncoder(w).Encode(response)
  201. }
  202. // RDMA ping handler
  203. func (s *Sidecar) pingHandler(w http.ResponseWriter, r *http.Request) {
  204. if r.Method != http.MethodGet {
  205. http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
  206. return
  207. }
  208. ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
  209. defer cancel()
  210. start := time.Now()
  211. latency, err := s.rdmaClient.Ping(ctx)
  212. totalLatency := time.Since(start)
  213. if err != nil {
  214. s.logger.WithError(err).Error("❌ RDMA ping failed")
  215. http.Error(w, fmt.Sprintf("Ping failed: %v", err), http.StatusInternalServerError)
  216. return
  217. }
  218. w.Header().Set("Content-Type", "application/json")
  219. response := PingResponse{
  220. Success: true,
  221. EngineLatency: latency.String(),
  222. TotalLatency: totalLatency.String(),
  223. Timestamp: time.Now().Format(time.RFC3339),
  224. }
  225. json.NewEncoder(w).Encode(response)
  226. }
  227. // RDMA read handler - uses GET method with query parameters for RESTful read operations
  228. func (s *Sidecar) rdmaReadHandler(w http.ResponseWriter, r *http.Request) {
  229. if r.Method != http.MethodGet {
  230. http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
  231. return
  232. }
  233. // Parse query parameters
  234. query := r.URL.Query()
  235. // Get file ID (e.g., "3,01637037d6") - this is the natural SeaweedFS identifier
  236. fileID := query.Get("file_id")
  237. if fileID == "" {
  238. http.Error(w, "missing 'file_id' parameter", http.StatusBadRequest)
  239. return
  240. }
  241. // Parse optional offset and size parameters
  242. offset := uint64(0) // default value
  243. if offsetStr := query.Get("offset"); offsetStr != "" {
  244. val, err := strconv.ParseUint(offsetStr, 10, 64)
  245. if err != nil {
  246. http.Error(w, "invalid 'offset' parameter", http.StatusBadRequest)
  247. return
  248. }
  249. offset = val
  250. }
  251. size := uint64(4096) // default value
  252. if sizeStr := query.Get("size"); sizeStr != "" {
  253. val, err := strconv.ParseUint(sizeStr, 10, 64)
  254. if err != nil {
  255. http.Error(w, "invalid 'size' parameter", http.StatusBadRequest)
  256. return
  257. }
  258. size = val
  259. }
  260. s.logger.WithFields(logrus.Fields{
  261. "file_id": fileID,
  262. "offset": offset,
  263. "size": size,
  264. }).Info("📖 Processing RDMA read request")
  265. ctx, cancel := context.WithTimeout(r.Context(), timeout)
  266. defer cancel()
  267. start := time.Now()
  268. resp, err := s.rdmaClient.ReadFileRange(ctx, fileID, offset, size)
  269. duration := time.Since(start)
  270. if err != nil {
  271. s.logger.WithError(err).Error("❌ RDMA read failed")
  272. http.Error(w, fmt.Sprintf("RDMA read failed: %v", err), http.StatusInternalServerError)
  273. return
  274. }
  275. s.logger.WithFields(logrus.Fields{
  276. "file_id": fileID,
  277. "bytes_read": resp.BytesRead,
  278. "duration": duration,
  279. "transfer_rate": resp.TransferRate,
  280. "session_id": resp.SessionID,
  281. }).Info("✅ RDMA read completed successfully")
  282. // Set response headers
  283. w.Header().Set("Content-Type", "application/octet-stream")
  284. w.Header().Set("X-RDMA-Session-ID", resp.SessionID)
  285. w.Header().Set("X-RDMA-Duration", duration.String())
  286. w.Header().Set("X-RDMA-Transfer-Rate", fmt.Sprintf("%.2f", resp.TransferRate))
  287. w.Header().Set("X-RDMA-Bytes-Read", fmt.Sprintf("%d", resp.BytesRead))
  288. // Write the data
  289. w.Write(resp.Data)
  290. }