client.go 12 KB


  1. // Package seaweedfs provides SeaweedFS-specific RDMA integration
  2. package seaweedfs
  3. import (
  4. "context"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "os"
  9. "path/filepath"
  10. "time"
  11. "seaweedfs-rdma-sidecar/pkg/rdma"
  12. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  14. "github.com/sirupsen/logrus"
  15. )
  16. // SeaweedFSRDMAClient provides SeaweedFS-specific RDMA operations
  17. type SeaweedFSRDMAClient struct {
  18. rdmaClient *rdma.Client
  19. logger *logrus.Logger
  20. volumeServerURL string
  21. enabled bool
  22. // Zero-copy optimization
  23. tempDir string
  24. useZeroCopy bool
  25. }
  26. // Config holds configuration for the SeaweedFS RDMA client
  27. type Config struct {
  28. RDMASocketPath string
  29. VolumeServerURL string
  30. Enabled bool
  31. DefaultTimeout time.Duration
  32. Logger *logrus.Logger
  33. // Zero-copy optimization
  34. TempDir string // Directory for temp files (default: /tmp/rdma-cache)
  35. UseZeroCopy bool // Enable zero-copy via temp files
  36. // Connection pooling options
  37. EnablePooling bool // Enable RDMA connection pooling (default: true)
  38. MaxConnections int // Max connections in pool (default: 10)
  39. MaxIdleTime time.Duration // Max idle time before connection cleanup (default: 5min)
  40. }
  41. // NeedleReadRequest represents a SeaweedFS needle read request
  42. type NeedleReadRequest struct {
  43. VolumeID uint32
  44. NeedleID uint64
  45. Cookie uint32
  46. Offset uint64
  47. Size uint64
  48. VolumeServer string // Override volume server URL for this request
  49. }
  50. // NeedleReadResponse represents the result of a needle read
  51. type NeedleReadResponse struct {
  52. Data []byte
  53. IsRDMA bool
  54. Latency time.Duration
  55. Source string // "rdma" or "http"
  56. SessionID string
  57. // Zero-copy optimization fields
  58. TempFilePath string // Path to temp file with data (for zero-copy)
  59. UseTempFile bool // Whether to use temp file instead of Data
  60. }
  61. // NewSeaweedFSRDMAClient creates a new SeaweedFS RDMA client
  62. func NewSeaweedFSRDMAClient(config *Config) (*SeaweedFSRDMAClient, error) {
  63. if config.Logger == nil {
  64. config.Logger = logrus.New()
  65. config.Logger.SetLevel(logrus.InfoLevel)
  66. }
  67. var rdmaClient *rdma.Client
  68. if config.Enabled && config.RDMASocketPath != "" {
  69. rdmaConfig := &rdma.Config{
  70. EngineSocketPath: config.RDMASocketPath,
  71. DefaultTimeout: config.DefaultTimeout,
  72. Logger: config.Logger,
  73. EnablePooling: config.EnablePooling,
  74. MaxConnections: config.MaxConnections,
  75. MaxIdleTime: config.MaxIdleTime,
  76. }
  77. rdmaClient = rdma.NewClient(rdmaConfig)
  78. }
  79. // Setup temp directory for zero-copy optimization
  80. tempDir := config.TempDir
  81. if tempDir == "" {
  82. tempDir = "/tmp/rdma-cache"
  83. }
  84. if config.UseZeroCopy {
  85. if err := os.MkdirAll(tempDir, 0755); err != nil {
  86. config.Logger.WithError(err).Warn("Failed to create temp directory, disabling zero-copy")
  87. config.UseZeroCopy = false
  88. }
  89. }
  90. return &SeaweedFSRDMAClient{
  91. rdmaClient: rdmaClient,
  92. logger: config.Logger,
  93. volumeServerURL: config.VolumeServerURL,
  94. enabled: config.Enabled,
  95. tempDir: tempDir,
  96. useZeroCopy: config.UseZeroCopy,
  97. }, nil
  98. }
  99. // Start initializes the RDMA client connection
  100. func (c *SeaweedFSRDMAClient) Start(ctx context.Context) error {
  101. if !c.enabled || c.rdmaClient == nil {
  102. c.logger.Info("🔄 RDMA disabled, using HTTP fallback only")
  103. return nil
  104. }
  105. c.logger.Info("🚀 Starting SeaweedFS RDMA client...")
  106. if err := c.rdmaClient.Connect(ctx); err != nil {
  107. c.logger.WithError(err).Error("❌ Failed to connect to RDMA engine")
  108. return fmt.Errorf("failed to connect to RDMA engine: %w", err)
  109. }
  110. c.logger.Info("✅ SeaweedFS RDMA client started successfully")
  111. return nil
  112. }
  113. // Stop shuts down the RDMA client
  114. func (c *SeaweedFSRDMAClient) Stop() {
  115. if c.rdmaClient != nil {
  116. c.rdmaClient.Disconnect()
  117. c.logger.Info("🔌 SeaweedFS RDMA client stopped")
  118. }
  119. }
  120. // IsEnabled returns true if RDMA is enabled and available
  121. func (c *SeaweedFSRDMAClient) IsEnabled() bool {
  122. return c.enabled && c.rdmaClient != nil && c.rdmaClient.IsConnected()
  123. }
  124. // ReadNeedle reads a needle using RDMA fast path or HTTP fallback
  125. func (c *SeaweedFSRDMAClient) ReadNeedle(ctx context.Context, req *NeedleReadRequest) (*NeedleReadResponse, error) {
  126. start := time.Now()
  127. var rdmaErr error
  128. // Try RDMA fast path first
  129. if c.IsEnabled() {
  130. c.logger.WithFields(logrus.Fields{
  131. "volume_id": req.VolumeID,
  132. "needle_id": req.NeedleID,
  133. "offset": req.Offset,
  134. "size": req.Size,
  135. }).Debug("🚀 Attempting RDMA fast path")
  136. rdmaReq := &rdma.ReadRequest{
  137. VolumeID: req.VolumeID,
  138. NeedleID: req.NeedleID,
  139. Cookie: req.Cookie,
  140. Offset: req.Offset,
  141. Size: req.Size,
  142. }
  143. resp, err := c.rdmaClient.Read(ctx, rdmaReq)
  144. if err != nil {
  145. c.logger.WithError(err).Warn("⚠️ RDMA read failed, falling back to HTTP")
  146. rdmaErr = err
  147. } else {
  148. c.logger.WithFields(logrus.Fields{
  149. "volume_id": req.VolumeID,
  150. "needle_id": req.NeedleID,
  151. "bytes_read": resp.BytesRead,
  152. "transfer_rate": resp.TransferRate,
  153. "latency": time.Since(start),
  154. }).Info("🚀 RDMA fast path successful")
  155. // Try zero-copy optimization if enabled and data is large enough
  156. if c.useZeroCopy && len(resp.Data) > 64*1024 { // 64KB threshold
  157. tempFilePath, err := c.writeToTempFile(req, resp.Data)
  158. if err != nil {
  159. c.logger.WithError(err).Warn("Failed to write temp file, using regular response")
  160. // Fall back to regular response
  161. } else {
  162. c.logger.WithFields(logrus.Fields{
  163. "temp_file": tempFilePath,
  164. "size": len(resp.Data),
  165. }).Info("🔥 Zero-copy temp file created")
  166. return &NeedleReadResponse{
  167. Data: nil, // Don't duplicate data in memory
  168. IsRDMA: true,
  169. Latency: time.Since(start),
  170. Source: "rdma-zerocopy",
  171. SessionID: resp.SessionID,
  172. TempFilePath: tempFilePath,
  173. UseTempFile: true,
  174. }, nil
  175. }
  176. }
  177. return &NeedleReadResponse{
  178. Data: resp.Data,
  179. IsRDMA: true,
  180. Latency: time.Since(start),
  181. Source: "rdma",
  182. SessionID: resp.SessionID,
  183. }, nil
  184. }
  185. }
  186. // Fallback to HTTP
  187. c.logger.WithFields(logrus.Fields{
  188. "volume_id": req.VolumeID,
  189. "needle_id": req.NeedleID,
  190. "reason": "rdma_unavailable",
  191. }).Debug("🌐 Using HTTP fallback")
  192. data, err := c.httpFallback(ctx, req)
  193. if err != nil {
  194. if rdmaErr != nil {
  195. return nil, fmt.Errorf("both RDMA and HTTP fallback failed: RDMA=%v, HTTP=%v", rdmaErr, err)
  196. }
  197. return nil, fmt.Errorf("HTTP fallback failed: %w", err)
  198. }
  199. return &NeedleReadResponse{
  200. Data: data,
  201. IsRDMA: false,
  202. Latency: time.Since(start),
  203. Source: "http",
  204. }, nil
  205. }
  206. // ReadNeedleRange reads a specific range from a needle
  207. func (c *SeaweedFSRDMAClient) ReadNeedleRange(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32, offset, size uint64) (*NeedleReadResponse, error) {
  208. req := &NeedleReadRequest{
  209. VolumeID: volumeID,
  210. NeedleID: needleID,
  211. Cookie: cookie,
  212. Offset: offset,
  213. Size: size,
  214. }
  215. return c.ReadNeedle(ctx, req)
  216. }
  217. // httpFallback performs HTTP fallback read from SeaweedFS volume server
  218. func (c *SeaweedFSRDMAClient) httpFallback(ctx context.Context, req *NeedleReadRequest) ([]byte, error) {
  219. // Use volume server from request, fallback to configured URL
  220. volumeServerURL := req.VolumeServer
  221. if volumeServerURL == "" {
  222. volumeServerURL = c.volumeServerURL
  223. }
  224. if volumeServerURL == "" {
  225. return nil, fmt.Errorf("no volume server URL provided in request or configured")
  226. }
  227. // Build URL using existing SeaweedFS file ID construction
  228. volumeId := needle.VolumeId(req.VolumeID)
  229. needleId := types.NeedleId(req.NeedleID)
  230. cookie := types.Cookie(req.Cookie)
  231. fileId := &needle.FileId{
  232. VolumeId: volumeId,
  233. Key: needleId,
  234. Cookie: cookie,
  235. }
  236. url := fmt.Sprintf("%s/%s", volumeServerURL, fileId.String())
  237. if req.Offset > 0 || req.Size > 0 {
  238. url += fmt.Sprintf("?offset=%d&size=%d", req.Offset, req.Size)
  239. }
  240. c.logger.WithField("url", url).Debug("📥 HTTP fallback request")
  241. httpReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
  242. if err != nil {
  243. return nil, fmt.Errorf("failed to create HTTP request: %w", err)
  244. }
  245. client := &http.Client{Timeout: 30 * time.Second}
  246. resp, err := client.Do(httpReq)
  247. if err != nil {
  248. return nil, fmt.Errorf("HTTP request failed: %w", err)
  249. }
  250. defer resp.Body.Close()
  251. if resp.StatusCode != http.StatusOK {
  252. return nil, fmt.Errorf("HTTP request failed with status: %d", resp.StatusCode)
  253. }
  254. // Read response data - io.ReadAll handles context cancellation and timeouts correctly
  255. data, err := io.ReadAll(resp.Body)
  256. if err != nil {
  257. return nil, fmt.Errorf("failed to read HTTP response body: %w", err)
  258. }
  259. c.logger.WithFields(logrus.Fields{
  260. "volume_id": req.VolumeID,
  261. "needle_id": req.NeedleID,
  262. "data_size": len(data),
  263. }).Debug("📥 HTTP fallback successful")
  264. return data, nil
  265. }
  266. // HealthCheck verifies that the RDMA client is healthy
  267. func (c *SeaweedFSRDMAClient) HealthCheck(ctx context.Context) error {
  268. if !c.enabled {
  269. return fmt.Errorf("RDMA is disabled")
  270. }
  271. if c.rdmaClient == nil {
  272. return fmt.Errorf("RDMA client not initialized")
  273. }
  274. if !c.rdmaClient.IsConnected() {
  275. return fmt.Errorf("RDMA client not connected")
  276. }
  277. // Try a ping to the RDMA engine
  278. _, err := c.rdmaClient.Ping(ctx)
  279. return err
  280. }
  281. // GetStats returns statistics about the RDMA client
  282. func (c *SeaweedFSRDMAClient) GetStats() map[string]interface{} {
  283. stats := map[string]interface{}{
  284. "enabled": c.enabled,
  285. "volume_server_url": c.volumeServerURL,
  286. "rdma_socket_path": "",
  287. }
  288. if c.rdmaClient != nil {
  289. stats["connected"] = c.rdmaClient.IsConnected()
  290. // Note: Capabilities method may not be available, skip for now
  291. } else {
  292. stats["connected"] = false
  293. stats["error"] = "RDMA client not initialized"
  294. }
  295. return stats
  296. }
  297. // writeToTempFile writes RDMA data to a temp file for zero-copy optimization
  298. func (c *SeaweedFSRDMAClient) writeToTempFile(req *NeedleReadRequest, data []byte) (string, error) {
  299. // Create temp file with unique name based on needle info
  300. fileName := fmt.Sprintf("vol%d_needle%x_cookie%d_offset%d_size%d.tmp",
  301. req.VolumeID, req.NeedleID, req.Cookie, req.Offset, req.Size)
  302. tempFilePath := filepath.Join(c.tempDir, fileName)
  303. // Write data to temp file (this populates the page cache)
  304. err := os.WriteFile(tempFilePath, data, 0644)
  305. if err != nil {
  306. return "", fmt.Errorf("failed to write temp file: %w", err)
  307. }
  308. c.logger.WithFields(logrus.Fields{
  309. "temp_file": tempFilePath,
  310. "size": len(data),
  311. }).Debug("📁 Temp file written to page cache")
  312. return tempFilePath, nil
  313. }
  314. // CleanupTempFile removes a temp file (called by mount client after use)
  315. func (c *SeaweedFSRDMAClient) CleanupTempFile(tempFilePath string) error {
  316. if tempFilePath == "" {
  317. return nil
  318. }
  319. // Validate that tempFilePath is within c.tempDir
  320. absTempDir, err := filepath.Abs(c.tempDir)
  321. if err != nil {
  322. return fmt.Errorf("failed to resolve temp dir: %w", err)
  323. }
  324. absFilePath, err := filepath.Abs(tempFilePath)
  325. if err != nil {
  326. return fmt.Errorf("failed to resolve temp file path: %w", err)
  327. }
  328. // Ensure absFilePath is within absTempDir
  329. if !strings.HasPrefix(absFilePath, absTempDir+string(os.PathSeparator)) && absFilePath != absTempDir {
  330. c.logger.WithField("temp_file", tempFilePath).Warn("Attempted cleanup of file outside temp dir")
  331. return fmt.Errorf("invalid temp file path")
  332. }
  333. err = os.Remove(absFilePath)
  334. if err != nil && !os.IsNotExist(err) {
  335. c.logger.WithError(err).WithField("temp_file", absFilePath).Warn("Failed to cleanup temp file")
  336. return err
  337. }
  338. c.logger.WithField("temp_file", absFilePath).Debug("🧹 Temp file cleaned up")
  339. return nil
  340. }