| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- // Package seaweedfs provides SeaweedFS-specific RDMA integration
- package seaweedfs
- import (
- "context"
- "fmt"
- "io"
- "net/http"
- "os"
- "path/filepath"
- "time"
- "seaweedfs-rdma-sidecar/pkg/rdma"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
- "github.com/sirupsen/logrus"
- )
- // SeaweedFSRDMAClient provides SeaweedFS-specific RDMA operations
- type SeaweedFSRDMAClient struct {
- rdmaClient *rdma.Client
- logger *logrus.Logger
- volumeServerURL string
- enabled bool
- // Zero-copy optimization
- tempDir string
- useZeroCopy bool
- }
- // Config holds configuration for the SeaweedFS RDMA client
- type Config struct {
- RDMASocketPath string
- VolumeServerURL string
- Enabled bool
- DefaultTimeout time.Duration
- Logger *logrus.Logger
- // Zero-copy optimization
- TempDir string // Directory for temp files (default: /tmp/rdma-cache)
- UseZeroCopy bool // Enable zero-copy via temp files
- // Connection pooling options
- EnablePooling bool // Enable RDMA connection pooling (default: true)
- MaxConnections int // Max connections in pool (default: 10)
- MaxIdleTime time.Duration // Max idle time before connection cleanup (default: 5min)
- }
- // NeedleReadRequest represents a SeaweedFS needle read request
- type NeedleReadRequest struct {
- VolumeID uint32
- NeedleID uint64
- Cookie uint32
- Offset uint64
- Size uint64
- VolumeServer string // Override volume server URL for this request
- }
- // NeedleReadResponse represents the result of a needle read
- type NeedleReadResponse struct {
- Data []byte
- IsRDMA bool
- Latency time.Duration
- Source string // "rdma" or "http"
- SessionID string
- // Zero-copy optimization fields
- TempFilePath string // Path to temp file with data (for zero-copy)
- UseTempFile bool // Whether to use temp file instead of Data
- }
- // NewSeaweedFSRDMAClient creates a new SeaweedFS RDMA client
- func NewSeaweedFSRDMAClient(config *Config) (*SeaweedFSRDMAClient, error) {
- if config.Logger == nil {
- config.Logger = logrus.New()
- config.Logger.SetLevel(logrus.InfoLevel)
- }
- var rdmaClient *rdma.Client
- if config.Enabled && config.RDMASocketPath != "" {
- rdmaConfig := &rdma.Config{
- EngineSocketPath: config.RDMASocketPath,
- DefaultTimeout: config.DefaultTimeout,
- Logger: config.Logger,
- EnablePooling: config.EnablePooling,
- MaxConnections: config.MaxConnections,
- MaxIdleTime: config.MaxIdleTime,
- }
- rdmaClient = rdma.NewClient(rdmaConfig)
- }
- // Setup temp directory for zero-copy optimization
- tempDir := config.TempDir
- if tempDir == "" {
- tempDir = "/tmp/rdma-cache"
- }
- if config.UseZeroCopy {
- if err := os.MkdirAll(tempDir, 0755); err != nil {
- config.Logger.WithError(err).Warn("Failed to create temp directory, disabling zero-copy")
- config.UseZeroCopy = false
- }
- }
- return &SeaweedFSRDMAClient{
- rdmaClient: rdmaClient,
- logger: config.Logger,
- volumeServerURL: config.VolumeServerURL,
- enabled: config.Enabled,
- tempDir: tempDir,
- useZeroCopy: config.UseZeroCopy,
- }, nil
- }
- // Start initializes the RDMA client connection
- func (c *SeaweedFSRDMAClient) Start(ctx context.Context) error {
- if !c.enabled || c.rdmaClient == nil {
- c.logger.Info("🔄 RDMA disabled, using HTTP fallback only")
- return nil
- }
- c.logger.Info("🚀 Starting SeaweedFS RDMA client...")
- if err := c.rdmaClient.Connect(ctx); err != nil {
- c.logger.WithError(err).Error("❌ Failed to connect to RDMA engine")
- return fmt.Errorf("failed to connect to RDMA engine: %w", err)
- }
- c.logger.Info("✅ SeaweedFS RDMA client started successfully")
- return nil
- }
- // Stop shuts down the RDMA client
- func (c *SeaweedFSRDMAClient) Stop() {
- if c.rdmaClient != nil {
- c.rdmaClient.Disconnect()
- c.logger.Info("🔌 SeaweedFS RDMA client stopped")
- }
- }
- // IsEnabled returns true if RDMA is enabled and available
- func (c *SeaweedFSRDMAClient) IsEnabled() bool {
- return c.enabled && c.rdmaClient != nil && c.rdmaClient.IsConnected()
- }
- // ReadNeedle reads a needle using RDMA fast path or HTTP fallback
- func (c *SeaweedFSRDMAClient) ReadNeedle(ctx context.Context, req *NeedleReadRequest) (*NeedleReadResponse, error) {
- start := time.Now()
- var rdmaErr error
- // Try RDMA fast path first
- if c.IsEnabled() {
- c.logger.WithFields(logrus.Fields{
- "volume_id": req.VolumeID,
- "needle_id": req.NeedleID,
- "offset": req.Offset,
- "size": req.Size,
- }).Debug("🚀 Attempting RDMA fast path")
- rdmaReq := &rdma.ReadRequest{
- VolumeID: req.VolumeID,
- NeedleID: req.NeedleID,
- Cookie: req.Cookie,
- Offset: req.Offset,
- Size: req.Size,
- }
- resp, err := c.rdmaClient.Read(ctx, rdmaReq)
- if err != nil {
- c.logger.WithError(err).Warn("⚠️ RDMA read failed, falling back to HTTP")
- rdmaErr = err
- } else {
- c.logger.WithFields(logrus.Fields{
- "volume_id": req.VolumeID,
- "needle_id": req.NeedleID,
- "bytes_read": resp.BytesRead,
- "transfer_rate": resp.TransferRate,
- "latency": time.Since(start),
- }).Info("🚀 RDMA fast path successful")
- // Try zero-copy optimization if enabled and data is large enough
- if c.useZeroCopy && len(resp.Data) > 64*1024 { // 64KB threshold
- tempFilePath, err := c.writeToTempFile(req, resp.Data)
- if err != nil {
- c.logger.WithError(err).Warn("Failed to write temp file, using regular response")
- // Fall back to regular response
- } else {
- c.logger.WithFields(logrus.Fields{
- "temp_file": tempFilePath,
- "size": len(resp.Data),
- }).Info("🔥 Zero-copy temp file created")
- return &NeedleReadResponse{
- Data: nil, // Don't duplicate data in memory
- IsRDMA: true,
- Latency: time.Since(start),
- Source: "rdma-zerocopy",
- SessionID: resp.SessionID,
- TempFilePath: tempFilePath,
- UseTempFile: true,
- }, nil
- }
- }
- return &NeedleReadResponse{
- Data: resp.Data,
- IsRDMA: true,
- Latency: time.Since(start),
- Source: "rdma",
- SessionID: resp.SessionID,
- }, nil
- }
- }
- // Fallback to HTTP
- c.logger.WithFields(logrus.Fields{
- "volume_id": req.VolumeID,
- "needle_id": req.NeedleID,
- "reason": "rdma_unavailable",
- }).Debug("🌐 Using HTTP fallback")
- data, err := c.httpFallback(ctx, req)
- if err != nil {
- if rdmaErr != nil {
- return nil, fmt.Errorf("both RDMA and HTTP fallback failed: RDMA=%v, HTTP=%v", rdmaErr, err)
- }
- return nil, fmt.Errorf("HTTP fallback failed: %w", err)
- }
- return &NeedleReadResponse{
- Data: data,
- IsRDMA: false,
- Latency: time.Since(start),
- Source: "http",
- }, nil
- }
- // ReadNeedleRange reads a specific range from a needle
- func (c *SeaweedFSRDMAClient) ReadNeedleRange(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32, offset, size uint64) (*NeedleReadResponse, error) {
- req := &NeedleReadRequest{
- VolumeID: volumeID,
- NeedleID: needleID,
- Cookie: cookie,
- Offset: offset,
- Size: size,
- }
- return c.ReadNeedle(ctx, req)
- }
- // httpFallback performs HTTP fallback read from SeaweedFS volume server
- func (c *SeaweedFSRDMAClient) httpFallback(ctx context.Context, req *NeedleReadRequest) ([]byte, error) {
- // Use volume server from request, fallback to configured URL
- volumeServerURL := req.VolumeServer
- if volumeServerURL == "" {
- volumeServerURL = c.volumeServerURL
- }
- if volumeServerURL == "" {
- return nil, fmt.Errorf("no volume server URL provided in request or configured")
- }
- // Build URL using existing SeaweedFS file ID construction
- volumeId := needle.VolumeId(req.VolumeID)
- needleId := types.NeedleId(req.NeedleID)
- cookie := types.Cookie(req.Cookie)
- fileId := &needle.FileId{
- VolumeId: volumeId,
- Key: needleId,
- Cookie: cookie,
- }
- url := fmt.Sprintf("%s/%s", volumeServerURL, fileId.String())
- if req.Offset > 0 || req.Size > 0 {
- url += fmt.Sprintf("?offset=%d&size=%d", req.Offset, req.Size)
- }
- c.logger.WithField("url", url).Debug("📥 HTTP fallback request")
- httpReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
- if err != nil {
- return nil, fmt.Errorf("failed to create HTTP request: %w", err)
- }
- client := &http.Client{Timeout: 30 * time.Second}
- resp, err := client.Do(httpReq)
- if err != nil {
- return nil, fmt.Errorf("HTTP request failed: %w", err)
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return nil, fmt.Errorf("HTTP request failed with status: %d", resp.StatusCode)
- }
- // Read response data - io.ReadAll handles context cancellation and timeouts correctly
- data, err := io.ReadAll(resp.Body)
- if err != nil {
- return nil, fmt.Errorf("failed to read HTTP response body: %w", err)
- }
- c.logger.WithFields(logrus.Fields{
- "volume_id": req.VolumeID,
- "needle_id": req.NeedleID,
- "data_size": len(data),
- }).Debug("📥 HTTP fallback successful")
- return data, nil
- }
- // HealthCheck verifies that the RDMA client is healthy
- func (c *SeaweedFSRDMAClient) HealthCheck(ctx context.Context) error {
- if !c.enabled {
- return fmt.Errorf("RDMA is disabled")
- }
- if c.rdmaClient == nil {
- return fmt.Errorf("RDMA client not initialized")
- }
- if !c.rdmaClient.IsConnected() {
- return fmt.Errorf("RDMA client not connected")
- }
- // Try a ping to the RDMA engine
- _, err := c.rdmaClient.Ping(ctx)
- return err
- }
- // GetStats returns statistics about the RDMA client
- func (c *SeaweedFSRDMAClient) GetStats() map[string]interface{} {
- stats := map[string]interface{}{
- "enabled": c.enabled,
- "volume_server_url": c.volumeServerURL,
- "rdma_socket_path": "",
- }
- if c.rdmaClient != nil {
- stats["connected"] = c.rdmaClient.IsConnected()
- // Note: Capabilities method may not be available, skip for now
- } else {
- stats["connected"] = false
- stats["error"] = "RDMA client not initialized"
- }
- return stats
- }
- // writeToTempFile writes RDMA data to a temp file for zero-copy optimization
- func (c *SeaweedFSRDMAClient) writeToTempFile(req *NeedleReadRequest, data []byte) (string, error) {
- // Create temp file with unique name based on needle info
- fileName := fmt.Sprintf("vol%d_needle%x_cookie%d_offset%d_size%d.tmp",
- req.VolumeID, req.NeedleID, req.Cookie, req.Offset, req.Size)
- tempFilePath := filepath.Join(c.tempDir, fileName)
- // Write data to temp file (this populates the page cache)
- err := os.WriteFile(tempFilePath, data, 0644)
- if err != nil {
- return "", fmt.Errorf("failed to write temp file: %w", err)
- }
- c.logger.WithFields(logrus.Fields{
- "temp_file": tempFilePath,
- "size": len(data),
- }).Debug("📁 Temp file written to page cache")
- return tempFilePath, nil
- }
- // CleanupTempFile removes a temp file (called by mount client after use)
- func (c *SeaweedFSRDMAClient) CleanupTempFile(tempFilePath string) error {
- if tempFilePath == "" {
- return nil
- }
- // Validate that tempFilePath is within c.tempDir
- absTempDir, err := filepath.Abs(c.tempDir)
- if err != nil {
- return fmt.Errorf("failed to resolve temp dir: %w", err)
- }
- absFilePath, err := filepath.Abs(tempFilePath)
- if err != nil {
- return fmt.Errorf("failed to resolve temp file path: %w", err)
- }
- // Ensure absFilePath is within absTempDir
- if !strings.HasPrefix(absFilePath, absTempDir+string(os.PathSeparator)) && absFilePath != absTempDir {
- c.logger.WithField("temp_file", tempFilePath).Warn("Attempted cleanup of file outside temp dir")
- return fmt.Errorf("invalid temp file path")
- }
- err = os.Remove(absFilePath)
- if err != nil && !os.IsNotExist(err) {
- c.logger.WithError(err).WithField("temp_file", absFilePath).Warn("Failed to cleanup temp file")
- return err
- }
- c.logger.WithField("temp_file", absFilePath).Debug("🧹 Temp file cleaned up")
- return nil
- }
|