| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- // Package main provides the main RDMA sidecar service that integrates with SeaweedFS
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "net/http"
- "os"
- "os/signal"
- "strconv"
- "syscall"
- "time"
- "seaweedfs-rdma-sidecar/pkg/rdma"
- "github.com/sirupsen/logrus"
- "github.com/spf13/cobra"
- )
- var (
- port int
- engineSocket string
- debug bool
- timeout time.Duration
- )
- // Response structs for JSON encoding
- type HealthResponse struct {
- Status string `json:"status"`
- RdmaEngineConnected bool `json:"rdma_engine_connected"`
- RdmaEngineLatency string `json:"rdma_engine_latency"`
- Timestamp string `json:"timestamp"`
- }
- type CapabilitiesResponse struct {
- Version string `json:"version"`
- DeviceName string `json:"device_name"`
- VendorId uint32 `json:"vendor_id"`
- MaxSessions uint32 `json:"max_sessions"`
- MaxTransferSize uint64 `json:"max_transfer_size"`
- ActiveSessions uint32 `json:"active_sessions"`
- RealRdma bool `json:"real_rdma"`
- PortGid string `json:"port_gid"`
- PortLid uint16 `json:"port_lid"`
- SupportedAuth []string `json:"supported_auth"`
- }
- type PingResponse struct {
- Success bool `json:"success"`
- EngineLatency string `json:"engine_latency"`
- TotalLatency string `json:"total_latency"`
- Timestamp string `json:"timestamp"`
- }
- func main() {
- var rootCmd = &cobra.Command{
- Use: "rdma-sidecar",
- Short: "SeaweedFS RDMA acceleration sidecar",
- Long: `RDMA sidecar that accelerates SeaweedFS read/write operations using UCX and Rust RDMA engine.
- This sidecar acts as a bridge between SeaweedFS volume servers and the high-performance
- Rust RDMA engine, providing significant performance improvements for data-intensive workloads.`,
- RunE: runSidecar,
- }
- // Flags
- rootCmd.Flags().IntVarP(&port, "port", "p", 8081, "HTTP server port")
- rootCmd.Flags().StringVarP(&engineSocket, "engine-socket", "e", "/tmp/rdma-engine.sock", "Path to RDMA engine Unix socket")
- rootCmd.Flags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging")
- rootCmd.Flags().DurationVarP(&timeout, "timeout", "t", 30*time.Second, "RDMA operation timeout")
- if err := rootCmd.Execute(); err != nil {
- fmt.Fprintf(os.Stderr, "Error: %v\n", err)
- os.Exit(1)
- }
- }
- func runSidecar(cmd *cobra.Command, args []string) error {
- // Setup logging
- logger := logrus.New()
- if debug {
- logger.SetLevel(logrus.DebugLevel)
- logger.SetFormatter(&logrus.TextFormatter{
- FullTimestamp: true,
- ForceColors: true,
- })
- } else {
- logger.SetLevel(logrus.InfoLevel)
- }
- logger.WithFields(logrus.Fields{
- "port": port,
- "engine_socket": engineSocket,
- "debug": debug,
- "timeout": timeout,
- }).Info("🚀 Starting SeaweedFS RDMA Sidecar")
- // Create RDMA client
- rdmaConfig := &rdma.Config{
- EngineSocketPath: engineSocket,
- DefaultTimeout: timeout,
- Logger: logger,
- }
- rdmaClient := rdma.NewClient(rdmaConfig)
- // Connect to RDMA engine
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- logger.Info("🔗 Connecting to RDMA engine...")
- if err := rdmaClient.Connect(ctx); err != nil {
- return fmt.Errorf("failed to connect to RDMA engine: %w", err)
- }
- logger.Info("✅ Connected to RDMA engine successfully")
- // Create HTTP server
- sidecar := &Sidecar{
- rdmaClient: rdmaClient,
- logger: logger,
- }
- mux := http.NewServeMux()
- // Health check endpoint
- mux.HandleFunc("/health", sidecar.healthHandler)
- // RDMA operations endpoints
- mux.HandleFunc("/rdma/read", sidecar.rdmaReadHandler)
- mux.HandleFunc("/rdma/capabilities", sidecar.capabilitiesHandler)
- mux.HandleFunc("/rdma/ping", sidecar.pingHandler)
- server := &http.Server{
- Addr: fmt.Sprintf(":%d", port),
- Handler: mux,
- }
- // Handle graceful shutdown
- sigChan := make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
- go func() {
- logger.WithField("port", port).Info("🌐 HTTP server starting")
- if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
- logger.WithError(err).Fatal("HTTP server failed")
- }
- }()
- // Wait for shutdown signal
- <-sigChan
- logger.Info("📡 Received shutdown signal, gracefully shutting down...")
- // Shutdown HTTP server
- shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer shutdownCancel()
- if err := server.Shutdown(shutdownCtx); err != nil {
- logger.WithError(err).Error("HTTP server shutdown failed")
- } else {
- logger.Info("🌐 HTTP server shutdown complete")
- }
- // Disconnect from RDMA engine
- rdmaClient.Disconnect()
- logger.Info("🛑 RDMA sidecar shutdown complete")
- return nil
- }
- // Sidecar represents the main sidecar service
- type Sidecar struct {
- rdmaClient *rdma.Client
- logger *logrus.Logger
- }
- // Health check handler
- func (s *Sidecar) healthHandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodGet {
- http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
- return
- }
- ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
- defer cancel()
- // Test RDMA engine connectivity
- if !s.rdmaClient.IsConnected() {
- s.logger.Warn("⚠️ RDMA engine not connected")
- http.Error(w, "RDMA engine not connected", http.StatusServiceUnavailable)
- return
- }
- // Ping RDMA engine
- latency, err := s.rdmaClient.Ping(ctx)
- if err != nil {
- s.logger.WithError(err).Error("❌ RDMA engine ping failed")
- http.Error(w, "RDMA engine ping failed", http.StatusServiceUnavailable)
- return
- }
- w.Header().Set("Content-Type", "application/json")
- response := HealthResponse{
- Status: "healthy",
- RdmaEngineConnected: true,
- RdmaEngineLatency: latency.String(),
- Timestamp: time.Now().Format(time.RFC3339),
- }
- json.NewEncoder(w).Encode(response)
- }
- // RDMA capabilities handler
- func (s *Sidecar) capabilitiesHandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodGet {
- http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
- return
- }
- caps := s.rdmaClient.GetCapabilities()
- if caps == nil {
- http.Error(w, "No capabilities available", http.StatusServiceUnavailable)
- return
- }
- w.Header().Set("Content-Type", "application/json")
- response := CapabilitiesResponse{
- Version: caps.Version,
- DeviceName: caps.DeviceName,
- VendorId: caps.VendorId,
- MaxSessions: uint32(caps.MaxSessions),
- MaxTransferSize: caps.MaxTransferSize,
- ActiveSessions: uint32(caps.ActiveSessions),
- RealRdma: caps.RealRdma,
- PortGid: caps.PortGid,
- PortLid: caps.PortLid,
- SupportedAuth: caps.SupportedAuth,
- }
- json.NewEncoder(w).Encode(response)
- }
- // RDMA ping handler
- func (s *Sidecar) pingHandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodGet {
- http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
- return
- }
- ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
- defer cancel()
- start := time.Now()
- latency, err := s.rdmaClient.Ping(ctx)
- totalLatency := time.Since(start)
- if err != nil {
- s.logger.WithError(err).Error("❌ RDMA ping failed")
- http.Error(w, fmt.Sprintf("Ping failed: %v", err), http.StatusInternalServerError)
- return
- }
- w.Header().Set("Content-Type", "application/json")
- response := PingResponse{
- Success: true,
- EngineLatency: latency.String(),
- TotalLatency: totalLatency.String(),
- Timestamp: time.Now().Format(time.RFC3339),
- }
- json.NewEncoder(w).Encode(response)
- }
- // RDMA read handler - uses GET method with query parameters for RESTful read operations
- func (s *Sidecar) rdmaReadHandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodGet {
- http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
- return
- }
- // Parse query parameters
- query := r.URL.Query()
- // Get file ID (e.g., "3,01637037d6") - this is the natural SeaweedFS identifier
- fileID := query.Get("file_id")
- if fileID == "" {
- http.Error(w, "missing 'file_id' parameter", http.StatusBadRequest)
- return
- }
- // Parse optional offset and size parameters
- offset := uint64(0) // default value
- if offsetStr := query.Get("offset"); offsetStr != "" {
- val, err := strconv.ParseUint(offsetStr, 10, 64)
- if err != nil {
- http.Error(w, "invalid 'offset' parameter", http.StatusBadRequest)
- return
- }
- offset = val
- }
- size := uint64(4096) // default value
- if sizeStr := query.Get("size"); sizeStr != "" {
- val, err := strconv.ParseUint(sizeStr, 10, 64)
- if err != nil {
- http.Error(w, "invalid 'size' parameter", http.StatusBadRequest)
- return
- }
- size = val
- }
- s.logger.WithFields(logrus.Fields{
- "file_id": fileID,
- "offset": offset,
- "size": size,
- }).Info("📖 Processing RDMA read request")
- ctx, cancel := context.WithTimeout(r.Context(), timeout)
- defer cancel()
- start := time.Now()
- resp, err := s.rdmaClient.ReadFileRange(ctx, fileID, offset, size)
- duration := time.Since(start)
- if err != nil {
- s.logger.WithError(err).Error("❌ RDMA read failed")
- http.Error(w, fmt.Sprintf("RDMA read failed: %v", err), http.StatusInternalServerError)
- return
- }
- s.logger.WithFields(logrus.Fields{
- "file_id": fileID,
- "bytes_read": resp.BytesRead,
- "duration": duration,
- "transfer_rate": resp.TransferRate,
- "session_id": resp.SessionID,
- }).Info("✅ RDMA read completed successfully")
- // Set response headers
- w.Header().Set("Content-Type", "application/octet-stream")
- w.Header().Set("X-RDMA-Session-ID", resp.SessionID)
- w.Header().Set("X-RDMA-Duration", duration.String())
- w.Header().Set("X-RDMA-Transfer-Rate", fmt.Sprintf("%.2f", resp.TransferRate))
- w.Header().Set("X-RDMA-Bytes-Read", fmt.Sprintf("%d", resp.BytesRead))
- // Write the data
- w.Write(resp.Data)
- }
|