| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- package mount
- import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "net/url"
- "os"
- "strings"
- "sync/atomic"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/wdclient"
- )
- // RDMAMountClient provides RDMA acceleration for SeaweedFS mount operations
- type RDMAMountClient struct {
- sidecarAddr string
- httpClient *http.Client
- maxConcurrent int
- timeout time.Duration
- semaphore chan struct{}
- // Volume lookup
- lookupFileIdFn wdclient.LookupFileIdFunctionType
- // Statistics
- totalRequests atomic.Int64
- successfulReads atomic.Int64
- failedReads atomic.Int64
- totalBytesRead atomic.Int64
- totalLatencyNs atomic.Int64
- }
- // RDMAReadRequest represents a request to read data via RDMA
- type RDMAReadRequest struct {
- VolumeID uint32 `json:"volume_id"`
- NeedleID uint64 `json:"needle_id"`
- Cookie uint32 `json:"cookie"`
- Offset uint64 `json:"offset"`
- Size uint64 `json:"size"`
- }
- // RDMAReadResponse represents the response from an RDMA read operation
- type RDMAReadResponse struct {
- Success bool `json:"success"`
- IsRDMA bool `json:"is_rdma"`
- Source string `json:"source"`
- Duration string `json:"duration"`
- DataSize int `json:"data_size"`
- SessionID string `json:"session_id,omitempty"`
- ErrorMsg string `json:"error,omitempty"`
- // Zero-copy optimization fields
- UseTempFile bool `json:"use_temp_file"`
- TempFile string `json:"temp_file"`
- }
- // RDMAHealthResponse represents the health status of the RDMA sidecar
- type RDMAHealthResponse struct {
- Status string `json:"status"`
- RDMA struct {
- Enabled bool `json:"enabled"`
- Connected bool `json:"connected"`
- } `json:"rdma"`
- Timestamp string `json:"timestamp"`
- }
- // NewRDMAMountClient creates a new RDMA client for mount operations
- func NewRDMAMountClient(sidecarAddr string, lookupFileIdFn wdclient.LookupFileIdFunctionType, maxConcurrent int, timeoutMs int) (*RDMAMountClient, error) {
- client := &RDMAMountClient{
- sidecarAddr: sidecarAddr,
- maxConcurrent: maxConcurrent,
- timeout: time.Duration(timeoutMs) * time.Millisecond,
- httpClient: &http.Client{
- Timeout: time.Duration(timeoutMs) * time.Millisecond,
- },
- semaphore: make(chan struct{}, maxConcurrent),
- lookupFileIdFn: lookupFileIdFn,
- }
- // Test connectivity and RDMA availability
- if err := client.healthCheck(); err != nil {
- return nil, fmt.Errorf("RDMA sidecar health check failed: %w", err)
- }
- glog.Infof("RDMA mount client initialized: sidecar=%s, maxConcurrent=%d, timeout=%v",
- sidecarAddr, maxConcurrent, client.timeout)
- return client, nil
- }
- // lookupVolumeLocationByFileID finds the best volume server for a given file ID
- func (c *RDMAMountClient) lookupVolumeLocationByFileID(ctx context.Context, fileID string) (string, error) {
- glog.V(4).Infof("Looking up volume location for file ID %s", fileID)
- targetUrls, err := c.lookupFileIdFn(ctx, fileID)
- if err != nil {
- return "", fmt.Errorf("failed to lookup volume for file %s: %w", fileID, err)
- }
- if len(targetUrls) == 0 {
- return "", fmt.Errorf("no locations found for file %s", fileID)
- }
- // Choose the first URL and extract the server address
- targetUrl := targetUrls[0]
- // Extract server address from URL like "http://server:port/fileId"
- parts := strings.Split(targetUrl, "/")
- if len(parts) < 3 {
- return "", fmt.Errorf("invalid target URL format: %s", targetUrl)
- }
- bestAddress := fmt.Sprintf("http://%s", parts[2])
- glog.V(4).Infof("File %s located at %s", fileID, bestAddress)
- return bestAddress, nil
- }
- // lookupVolumeLocation finds the best volume server for a given volume ID (legacy method)
- func (c *RDMAMountClient) lookupVolumeLocation(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32) (string, error) {
- // Create a file ID for lookup (format: volumeId,needleId,cookie)
- fileID := fmt.Sprintf("%d,%x,%d", volumeID, needleID, cookie)
- return c.lookupVolumeLocationByFileID(ctx, fileID)
- }
- // healthCheck verifies that the RDMA sidecar is available and functioning
- func (c *RDMAMountClient) healthCheck() error {
- ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
- defer cancel()
- req, err := http.NewRequestWithContext(ctx, "GET",
- fmt.Sprintf("http://%s/health", c.sidecarAddr), nil)
- if err != nil {
- return fmt.Errorf("failed to create health check request: %w", err)
- }
- resp, err := c.httpClient.Do(req)
- if err != nil {
- return fmt.Errorf("health check request failed: %w", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("health check failed with status: %s", resp.Status)
- }
- // Parse health response
- var health RDMAHealthResponse
- if err := json.NewDecoder(resp.Body).Decode(&health); err != nil {
- return fmt.Errorf("failed to parse health response: %w", err)
- }
- if health.Status != "healthy" {
- return fmt.Errorf("sidecar reports unhealthy status: %s", health.Status)
- }
- if !health.RDMA.Enabled {
- return fmt.Errorf("RDMA is not enabled on sidecar")
- }
- if !health.RDMA.Connected {
- glog.Warningf("RDMA sidecar is healthy but not connected to RDMA engine")
- }
- return nil
- }
- // ReadNeedle reads data from a specific needle using RDMA acceleration
- func (c *RDMAMountClient) ReadNeedle(ctx context.Context, fileID string, offset, size uint64) ([]byte, bool, error) {
- // Acquire semaphore for concurrency control
- select {
- case c.semaphore <- struct{}{}:
- defer func() { <-c.semaphore }()
- case <-ctx.Done():
- return nil, false, ctx.Err()
- }
- c.totalRequests.Add(1)
- startTime := time.Now()
- // Lookup volume location using file ID directly
- volumeServer, err := c.lookupVolumeLocationByFileID(ctx, fileID)
- if err != nil {
- c.failedReads.Add(1)
- return nil, false, fmt.Errorf("failed to lookup volume for file %s: %w", fileID, err)
- }
- // Prepare request URL with file_id parameter (simpler than individual components)
- reqURL := fmt.Sprintf("http://%s/read?file_id=%s&offset=%d&size=%d&volume_server=%s",
- c.sidecarAddr, fileID, offset, size, volumeServer)
- req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
- if err != nil {
- c.failedReads.Add(1)
- return nil, false, fmt.Errorf("failed to create RDMA request: %w", err)
- }
- // Execute request
- resp, err := c.httpClient.Do(req)
- if err != nil {
- c.failedReads.Add(1)
- return nil, false, fmt.Errorf("RDMA request failed: %w", err)
- }
- defer resp.Body.Close()
- duration := time.Since(startTime)
- c.totalLatencyNs.Add(duration.Nanoseconds())
- if resp.StatusCode != http.StatusOK {
- c.failedReads.Add(1)
- body, _ := io.ReadAll(resp.Body)
- return nil, false, fmt.Errorf("RDMA read failed with status %s: %s", resp.Status, string(body))
- }
- // Check if response indicates RDMA was used
- contentType := resp.Header.Get("Content-Type")
- isRDMA := strings.Contains(resp.Header.Get("X-Source"), "rdma") ||
- resp.Header.Get("X-RDMA-Used") == "true"
- // Check for zero-copy temp file optimization
- tempFilePath := resp.Header.Get("X-Temp-File")
- useTempFile := resp.Header.Get("X-Use-Temp-File") == "true"
- var data []byte
- if useTempFile && tempFilePath != "" {
- // Zero-copy path: read from temp file (page cache)
- glog.V(4).Infof("🔥 Using zero-copy temp file: %s", tempFilePath)
- // Allocate buffer for temp file read
- var bufferSize uint64 = 1024 * 1024 // Default 1MB
- if size > 0 {
- bufferSize = size
- }
- buffer := make([]byte, bufferSize)
- n, err := c.readFromTempFile(tempFilePath, buffer)
- if err != nil {
- glog.V(2).Infof("Zero-copy failed, falling back to HTTP body: %v", err)
- // Fall back to reading HTTP body
- data, err = io.ReadAll(resp.Body)
- } else {
- data = buffer[:n]
- glog.V(4).Infof("🔥 Zero-copy successful: %d bytes from page cache", n)
- }
- // Important: Cleanup temp file after reading (consumer responsibility)
- // This prevents accumulation of temp files in /tmp/rdma-cache
- go c.cleanupTempFile(tempFilePath)
- } else {
- // Regular path: read from HTTP response body
- data, err = io.ReadAll(resp.Body)
- }
- if err != nil {
- c.failedReads.Add(1)
- return nil, false, fmt.Errorf("failed to read RDMA response: %w", err)
- }
- c.successfulReads.Add(1)
- c.totalBytesRead.Add(int64(len(data)))
- // Log successful operation
- glog.V(4).Infof("RDMA read completed: fileID=%s, size=%d, duration=%v, rdma=%v, contentType=%s",
- fileID, size, duration, isRDMA, contentType)
- return data, isRDMA, nil
- }
- // cleanupTempFile requests cleanup of a temp file from the sidecar
- func (c *RDMAMountClient) cleanupTempFile(tempFilePath string) {
- if tempFilePath == "" {
- return
- }
- // Give the page cache a brief moment to be utilized before cleanup
- // This preserves the zero-copy performance window
- time.Sleep(100 * time.Millisecond)
- // Call sidecar cleanup endpoint
- cleanupURL := fmt.Sprintf("http://%s/cleanup?temp_file=%s", c.sidecarAddr, url.QueryEscape(tempFilePath))
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- req, err := http.NewRequestWithContext(ctx, "DELETE", cleanupURL, nil)
- if err != nil {
- glog.V(2).Infof("Failed to create cleanup request for %s: %v", tempFilePath, err)
- return
- }
- resp, err := c.httpClient.Do(req)
- if err != nil {
- glog.V(2).Infof("Failed to cleanup temp file %s: %v", tempFilePath, err)
- return
- }
- defer resp.Body.Close()
- if resp.StatusCode == http.StatusOK {
- glog.V(4).Infof("🧹 Temp file cleaned up: %s", tempFilePath)
- } else {
- glog.V(2).Infof("Cleanup failed for %s: status %s", tempFilePath, resp.Status)
- }
- }
- // GetStats returns current RDMA client statistics
- func (c *RDMAMountClient) GetStats() map[string]interface{} {
- totalRequests := c.totalRequests.Load()
- successfulReads := c.successfulReads.Load()
- failedReads := c.failedReads.Load()
- totalBytesRead := c.totalBytesRead.Load()
- totalLatencyNs := c.totalLatencyNs.Load()
- successRate := float64(0)
- avgLatencyNs := int64(0)
- if totalRequests > 0 {
- successRate = float64(successfulReads) / float64(totalRequests) * 100
- avgLatencyNs = totalLatencyNs / totalRequests
- }
- return map[string]interface{}{
- "sidecar_addr": c.sidecarAddr,
- "max_concurrent": c.maxConcurrent,
- "timeout_ms": int(c.timeout / time.Millisecond),
- "total_requests": totalRequests,
- "successful_reads": successfulReads,
- "failed_reads": failedReads,
- "success_rate_pct": fmt.Sprintf("%.1f", successRate),
- "total_bytes_read": totalBytesRead,
- "avg_latency_ns": avgLatencyNs,
- "avg_latency_ms": fmt.Sprintf("%.3f", float64(avgLatencyNs)/1000000),
- }
- }
- // Close shuts down the RDMA client and releases resources
- func (c *RDMAMountClient) Close() error {
- // No need to close semaphore channel; closing it may cause panics if goroutines are still using it.
- // The semaphore will be garbage collected when the client is no longer referenced.
- // Log final statistics
- stats := c.GetStats()
- glog.Infof("RDMA mount client closing: %+v", stats)
- return nil
- }
- // IsHealthy checks if the RDMA sidecar is currently healthy
- func (c *RDMAMountClient) IsHealthy() bool {
- err := c.healthCheck()
- return err == nil
- }
- // readFromTempFile performs zero-copy read from temp file using page cache
- func (c *RDMAMountClient) readFromTempFile(tempFilePath string, buffer []byte) (int, error) {
- if tempFilePath == "" {
- return 0, fmt.Errorf("empty temp file path")
- }
- // Open temp file for reading
- file, err := os.Open(tempFilePath)
- if err != nil {
- return 0, fmt.Errorf("failed to open temp file %s: %w", tempFilePath, err)
- }
- defer file.Close()
- // Read from temp file (this should be served from page cache)
- n, err := file.Read(buffer)
- if err != nil && err != io.EOF {
- return n, fmt.Errorf("failed to read from temp file: %w", err)
- }
- glog.V(4).Infof("🔥 Zero-copy read: %d bytes from temp file %s", n, tempFilePath)
- return n, nil
- }
|