rdma_client.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. package mount
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "strings"
  11. "sync/atomic"
  12. "time"
  13. "github.com/seaweedfs/seaweedfs/weed/glog"
  14. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  15. )
  16. // RDMAMountClient provides RDMA acceleration for SeaweedFS mount operations
  17. type RDMAMountClient struct {
  18. sidecarAddr string
  19. httpClient *http.Client
  20. maxConcurrent int
  21. timeout time.Duration
  22. semaphore chan struct{}
  23. // Volume lookup
  24. lookupFileIdFn wdclient.LookupFileIdFunctionType
  25. // Statistics
  26. totalRequests atomic.Int64
  27. successfulReads atomic.Int64
  28. failedReads atomic.Int64
  29. totalBytesRead atomic.Int64
  30. totalLatencyNs atomic.Int64
  31. }
  32. // RDMAReadRequest represents a request to read data via RDMA
  33. type RDMAReadRequest struct {
  34. VolumeID uint32 `json:"volume_id"`
  35. NeedleID uint64 `json:"needle_id"`
  36. Cookie uint32 `json:"cookie"`
  37. Offset uint64 `json:"offset"`
  38. Size uint64 `json:"size"`
  39. }
  40. // RDMAReadResponse represents the response from an RDMA read operation
  41. type RDMAReadResponse struct {
  42. Success bool `json:"success"`
  43. IsRDMA bool `json:"is_rdma"`
  44. Source string `json:"source"`
  45. Duration string `json:"duration"`
  46. DataSize int `json:"data_size"`
  47. SessionID string `json:"session_id,omitempty"`
  48. ErrorMsg string `json:"error,omitempty"`
  49. // Zero-copy optimization fields
  50. UseTempFile bool `json:"use_temp_file"`
  51. TempFile string `json:"temp_file"`
  52. }
  53. // RDMAHealthResponse represents the health status of the RDMA sidecar
  54. type RDMAHealthResponse struct {
  55. Status string `json:"status"`
  56. RDMA struct {
  57. Enabled bool `json:"enabled"`
  58. Connected bool `json:"connected"`
  59. } `json:"rdma"`
  60. Timestamp string `json:"timestamp"`
  61. }
  62. // NewRDMAMountClient creates a new RDMA client for mount operations
  63. func NewRDMAMountClient(sidecarAddr string, lookupFileIdFn wdclient.LookupFileIdFunctionType, maxConcurrent int, timeoutMs int) (*RDMAMountClient, error) {
  64. client := &RDMAMountClient{
  65. sidecarAddr: sidecarAddr,
  66. maxConcurrent: maxConcurrent,
  67. timeout: time.Duration(timeoutMs) * time.Millisecond,
  68. httpClient: &http.Client{
  69. Timeout: time.Duration(timeoutMs) * time.Millisecond,
  70. },
  71. semaphore: make(chan struct{}, maxConcurrent),
  72. lookupFileIdFn: lookupFileIdFn,
  73. }
  74. // Test connectivity and RDMA availability
  75. if err := client.healthCheck(); err != nil {
  76. return nil, fmt.Errorf("RDMA sidecar health check failed: %w", err)
  77. }
  78. glog.Infof("RDMA mount client initialized: sidecar=%s, maxConcurrent=%d, timeout=%v",
  79. sidecarAddr, maxConcurrent, client.timeout)
  80. return client, nil
  81. }
  82. // lookupVolumeLocationByFileID finds the best volume server for a given file ID
  83. func (c *RDMAMountClient) lookupVolumeLocationByFileID(ctx context.Context, fileID string) (string, error) {
  84. glog.V(4).Infof("Looking up volume location for file ID %s", fileID)
  85. targetUrls, err := c.lookupFileIdFn(ctx, fileID)
  86. if err != nil {
  87. return "", fmt.Errorf("failed to lookup volume for file %s: %w", fileID, err)
  88. }
  89. if len(targetUrls) == 0 {
  90. return "", fmt.Errorf("no locations found for file %s", fileID)
  91. }
  92. // Choose the first URL and extract the server address
  93. targetUrl := targetUrls[0]
  94. // Extract server address from URL like "http://server:port/fileId"
  95. parts := strings.Split(targetUrl, "/")
  96. if len(parts) < 3 {
  97. return "", fmt.Errorf("invalid target URL format: %s", targetUrl)
  98. }
  99. bestAddress := fmt.Sprintf("http://%s", parts[2])
  100. glog.V(4).Infof("File %s located at %s", fileID, bestAddress)
  101. return bestAddress, nil
  102. }
  103. // lookupVolumeLocation finds the best volume server for a given volume ID (legacy method)
  104. func (c *RDMAMountClient) lookupVolumeLocation(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32) (string, error) {
  105. // Create a file ID for lookup (format: volumeId,needleId,cookie)
  106. fileID := fmt.Sprintf("%d,%x,%d", volumeID, needleID, cookie)
  107. return c.lookupVolumeLocationByFileID(ctx, fileID)
  108. }
  109. // healthCheck verifies that the RDMA sidecar is available and functioning
  110. func (c *RDMAMountClient) healthCheck() error {
  111. ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
  112. defer cancel()
  113. req, err := http.NewRequestWithContext(ctx, "GET",
  114. fmt.Sprintf("http://%s/health", c.sidecarAddr), nil)
  115. if err != nil {
  116. return fmt.Errorf("failed to create health check request: %w", err)
  117. }
  118. resp, err := c.httpClient.Do(req)
  119. if err != nil {
  120. return fmt.Errorf("health check request failed: %w", err)
  121. }
  122. defer resp.Body.Close()
  123. if resp.StatusCode != http.StatusOK {
  124. return fmt.Errorf("health check failed with status: %s", resp.Status)
  125. }
  126. // Parse health response
  127. var health RDMAHealthResponse
  128. if err := json.NewDecoder(resp.Body).Decode(&health); err != nil {
  129. return fmt.Errorf("failed to parse health response: %w", err)
  130. }
  131. if health.Status != "healthy" {
  132. return fmt.Errorf("sidecar reports unhealthy status: %s", health.Status)
  133. }
  134. if !health.RDMA.Enabled {
  135. return fmt.Errorf("RDMA is not enabled on sidecar")
  136. }
  137. if !health.RDMA.Connected {
  138. glog.Warningf("RDMA sidecar is healthy but not connected to RDMA engine")
  139. }
  140. return nil
  141. }
  142. // ReadNeedle reads data from a specific needle using RDMA acceleration
  143. func (c *RDMAMountClient) ReadNeedle(ctx context.Context, fileID string, offset, size uint64) ([]byte, bool, error) {
  144. // Acquire semaphore for concurrency control
  145. select {
  146. case c.semaphore <- struct{}{}:
  147. defer func() { <-c.semaphore }()
  148. case <-ctx.Done():
  149. return nil, false, ctx.Err()
  150. }
  151. c.totalRequests.Add(1)
  152. startTime := time.Now()
  153. // Lookup volume location using file ID directly
  154. volumeServer, err := c.lookupVolumeLocationByFileID(ctx, fileID)
  155. if err != nil {
  156. c.failedReads.Add(1)
  157. return nil, false, fmt.Errorf("failed to lookup volume for file %s: %w", fileID, err)
  158. }
  159. // Prepare request URL with file_id parameter (simpler than individual components)
  160. reqURL := fmt.Sprintf("http://%s/read?file_id=%s&offset=%d&size=%d&volume_server=%s",
  161. c.sidecarAddr, fileID, offset, size, volumeServer)
  162. req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
  163. if err != nil {
  164. c.failedReads.Add(1)
  165. return nil, false, fmt.Errorf("failed to create RDMA request: %w", err)
  166. }
  167. // Execute request
  168. resp, err := c.httpClient.Do(req)
  169. if err != nil {
  170. c.failedReads.Add(1)
  171. return nil, false, fmt.Errorf("RDMA request failed: %w", err)
  172. }
  173. defer resp.Body.Close()
  174. duration := time.Since(startTime)
  175. c.totalLatencyNs.Add(duration.Nanoseconds())
  176. if resp.StatusCode != http.StatusOK {
  177. c.failedReads.Add(1)
  178. body, _ := io.ReadAll(resp.Body)
  179. return nil, false, fmt.Errorf("RDMA read failed with status %s: %s", resp.Status, string(body))
  180. }
  181. // Check if response indicates RDMA was used
  182. contentType := resp.Header.Get("Content-Type")
  183. isRDMA := strings.Contains(resp.Header.Get("X-Source"), "rdma") ||
  184. resp.Header.Get("X-RDMA-Used") == "true"
  185. // Check for zero-copy temp file optimization
  186. tempFilePath := resp.Header.Get("X-Temp-File")
  187. useTempFile := resp.Header.Get("X-Use-Temp-File") == "true"
  188. var data []byte
  189. if useTempFile && tempFilePath != "" {
  190. // Zero-copy path: read from temp file (page cache)
  191. glog.V(4).Infof("🔥 Using zero-copy temp file: %s", tempFilePath)
  192. // Allocate buffer for temp file read
  193. var bufferSize uint64 = 1024 * 1024 // Default 1MB
  194. if size > 0 {
  195. bufferSize = size
  196. }
  197. buffer := make([]byte, bufferSize)
  198. n, err := c.readFromTempFile(tempFilePath, buffer)
  199. if err != nil {
  200. glog.V(2).Infof("Zero-copy failed, falling back to HTTP body: %v", err)
  201. // Fall back to reading HTTP body
  202. data, err = io.ReadAll(resp.Body)
  203. } else {
  204. data = buffer[:n]
  205. glog.V(4).Infof("🔥 Zero-copy successful: %d bytes from page cache", n)
  206. }
  207. // Important: Cleanup temp file after reading (consumer responsibility)
  208. // This prevents accumulation of temp files in /tmp/rdma-cache
  209. go c.cleanupTempFile(tempFilePath)
  210. } else {
  211. // Regular path: read from HTTP response body
  212. data, err = io.ReadAll(resp.Body)
  213. }
  214. if err != nil {
  215. c.failedReads.Add(1)
  216. return nil, false, fmt.Errorf("failed to read RDMA response: %w", err)
  217. }
  218. c.successfulReads.Add(1)
  219. c.totalBytesRead.Add(int64(len(data)))
  220. // Log successful operation
  221. glog.V(4).Infof("RDMA read completed: fileID=%s, size=%d, duration=%v, rdma=%v, contentType=%s",
  222. fileID, size, duration, isRDMA, contentType)
  223. return data, isRDMA, nil
  224. }
  225. // cleanupTempFile requests cleanup of a temp file from the sidecar
  226. func (c *RDMAMountClient) cleanupTempFile(tempFilePath string) {
  227. if tempFilePath == "" {
  228. return
  229. }
  230. // Give the page cache a brief moment to be utilized before cleanup
  231. // This preserves the zero-copy performance window
  232. time.Sleep(100 * time.Millisecond)
  233. // Call sidecar cleanup endpoint
  234. cleanupURL := fmt.Sprintf("http://%s/cleanup?temp_file=%s", c.sidecarAddr, url.QueryEscape(tempFilePath))
  235. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  236. defer cancel()
  237. req, err := http.NewRequestWithContext(ctx, "DELETE", cleanupURL, nil)
  238. if err != nil {
  239. glog.V(2).Infof("Failed to create cleanup request for %s: %v", tempFilePath, err)
  240. return
  241. }
  242. resp, err := c.httpClient.Do(req)
  243. if err != nil {
  244. glog.V(2).Infof("Failed to cleanup temp file %s: %v", tempFilePath, err)
  245. return
  246. }
  247. defer resp.Body.Close()
  248. if resp.StatusCode == http.StatusOK {
  249. glog.V(4).Infof("🧹 Temp file cleaned up: %s", tempFilePath)
  250. } else {
  251. glog.V(2).Infof("Cleanup failed for %s: status %s", tempFilePath, resp.Status)
  252. }
  253. }
  254. // GetStats returns current RDMA client statistics
  255. func (c *RDMAMountClient) GetStats() map[string]interface{} {
  256. totalRequests := c.totalRequests.Load()
  257. successfulReads := c.successfulReads.Load()
  258. failedReads := c.failedReads.Load()
  259. totalBytesRead := c.totalBytesRead.Load()
  260. totalLatencyNs := c.totalLatencyNs.Load()
  261. successRate := float64(0)
  262. avgLatencyNs := int64(0)
  263. if totalRequests > 0 {
  264. successRate = float64(successfulReads) / float64(totalRequests) * 100
  265. avgLatencyNs = totalLatencyNs / totalRequests
  266. }
  267. return map[string]interface{}{
  268. "sidecar_addr": c.sidecarAddr,
  269. "max_concurrent": c.maxConcurrent,
  270. "timeout_ms": int(c.timeout / time.Millisecond),
  271. "total_requests": totalRequests,
  272. "successful_reads": successfulReads,
  273. "failed_reads": failedReads,
  274. "success_rate_pct": fmt.Sprintf("%.1f", successRate),
  275. "total_bytes_read": totalBytesRead,
  276. "avg_latency_ns": avgLatencyNs,
  277. "avg_latency_ms": fmt.Sprintf("%.3f", float64(avgLatencyNs)/1000000),
  278. }
  279. }
  280. // Close shuts down the RDMA client and releases resources
  281. func (c *RDMAMountClient) Close() error {
  282. // No need to close semaphore channel; closing it may cause panics if goroutines are still using it.
  283. // The semaphore will be garbage collected when the client is no longer referenced.
  284. // Log final statistics
  285. stats := c.GetStats()
  286. glog.Infof("RDMA mount client closing: %+v", stats)
  287. return nil
  288. }
  289. // IsHealthy checks if the RDMA sidecar is currently healthy
  290. func (c *RDMAMountClient) IsHealthy() bool {
  291. err := c.healthCheck()
  292. return err == nil
  293. }
  294. // readFromTempFile performs zero-copy read from temp file using page cache
  295. func (c *RDMAMountClient) readFromTempFile(tempFilePath string, buffer []byte) (int, error) {
  296. if tempFilePath == "" {
  297. return 0, fmt.Errorf("empty temp file path")
  298. }
  299. // Open temp file for reading
  300. file, err := os.Open(tempFilePath)
  301. if err != nil {
  302. return 0, fmt.Errorf("failed to open temp file %s: %w", tempFilePath, err)
  303. }
  304. defer file.Close()
  305. // Read from temp file (this should be served from page cache)
  306. n, err := file.Read(buffer)
  307. if err != nil && err != io.EOF {
  308. return n, fmt.Errorf("failed to read from temp file: %w", err)
  309. }
  310. glog.V(4).Infof("🔥 Zero-copy read: %d bytes from temp file %s", n, tempFilePath)
  311. return n, nil
  312. }