| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630 |
- // Package rdma provides high-level RDMA operations for SeaweedFS integration
- package rdma
- import (
- "context"
- "fmt"
- "sync"
- "time"
- "seaweedfs-rdma-sidecar/pkg/ipc"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/sirupsen/logrus"
- )
- // PooledConnection represents a pooled RDMA connection
- type PooledConnection struct {
- ipcClient *ipc.Client
- lastUsed time.Time
- inUse bool
- sessionID string
- created time.Time
- }
- // ConnectionPool manages a pool of RDMA connections
- type ConnectionPool struct {
- connections []*PooledConnection
- mutex sync.RWMutex
- maxConnections int
- maxIdleTime time.Duration
- enginePath string
- logger *logrus.Logger
- }
- // Client provides high-level RDMA operations with connection pooling
- type Client struct {
- pool *ConnectionPool
- logger *logrus.Logger
- enginePath string
- capabilities *ipc.GetCapabilitiesResponse
- connected bool
- defaultTimeout time.Duration
- // Legacy single connection (for backward compatibility)
- ipcClient *ipc.Client
- }
- // Config holds configuration for the RDMA client
- type Config struct {
- EngineSocketPath string
- DefaultTimeout time.Duration
- Logger *logrus.Logger
- // Connection pooling options
- EnablePooling bool // Enable connection pooling (default: true)
- MaxConnections int // Max connections in pool (default: 10)
- MaxIdleTime time.Duration // Max idle time before connection cleanup (default: 5min)
- }
- // ReadRequest represents a SeaweedFS needle read request
- type ReadRequest struct {
- VolumeID uint32
- NeedleID uint64
- Cookie uint32
- Offset uint64
- Size uint64
- AuthToken *string
- }
- // ReadResponse represents the result of an RDMA read operation
- type ReadResponse struct {
- Data []byte
- BytesRead uint64
- Duration time.Duration
- TransferRate float64
- SessionID string
- Success bool
- Message string
- }
- // NewConnectionPool creates a new connection pool
- func NewConnectionPool(enginePath string, maxConnections int, maxIdleTime time.Duration, logger *logrus.Logger) *ConnectionPool {
- if maxConnections <= 0 {
- maxConnections = 10 // Default
- }
- if maxIdleTime <= 0 {
- maxIdleTime = 5 * time.Minute // Default
- }
- return &ConnectionPool{
- connections: make([]*PooledConnection, 0, maxConnections),
- maxConnections: maxConnections,
- maxIdleTime: maxIdleTime,
- enginePath: enginePath,
- logger: logger,
- }
- }
- // getConnection gets an available connection from the pool or creates a new one
- func (p *ConnectionPool) getConnection(ctx context.Context) (*PooledConnection, error) {
- p.mutex.Lock()
- defer p.mutex.Unlock()
- // Look for an available connection
- for _, conn := range p.connections {
- if !conn.inUse && time.Since(conn.lastUsed) < p.maxIdleTime {
- conn.inUse = true
- conn.lastUsed = time.Now()
- p.logger.WithField("session_id", conn.sessionID).Debug("🔌 Reusing pooled RDMA connection")
- return conn, nil
- }
- }
- // Create new connection if under limit
- if len(p.connections) < p.maxConnections {
- ipcClient := ipc.NewClient(p.enginePath, p.logger)
- if err := ipcClient.Connect(ctx); err != nil {
- return nil, fmt.Errorf("failed to create new pooled connection: %w", err)
- }
- conn := &PooledConnection{
- ipcClient: ipcClient,
- lastUsed: time.Now(),
- inUse: true,
- sessionID: fmt.Sprintf("pool-%d-%d", len(p.connections), time.Now().Unix()),
- created: time.Now(),
- }
- p.connections = append(p.connections, conn)
- p.logger.WithFields(logrus.Fields{
- "session_id": conn.sessionID,
- "pool_size": len(p.connections),
- }).Info("🚀 Created new pooled RDMA connection")
- return conn, nil
- }
- // Pool is full, wait for an available connection
- return nil, fmt.Errorf("connection pool exhausted (max: %d)", p.maxConnections)
- }
- // releaseConnection returns a connection to the pool
- func (p *ConnectionPool) releaseConnection(conn *PooledConnection) {
- p.mutex.Lock()
- defer p.mutex.Unlock()
- conn.inUse = false
- conn.lastUsed = time.Now()
- p.logger.WithField("session_id", conn.sessionID).Debug("🔄 Released RDMA connection back to pool")
- }
- // cleanup removes idle connections from the pool
- func (p *ConnectionPool) cleanup() {
- p.mutex.Lock()
- defer p.mutex.Unlock()
- now := time.Now()
- activeConnections := make([]*PooledConnection, 0, len(p.connections))
- for _, conn := range p.connections {
- if conn.inUse || now.Sub(conn.lastUsed) < p.maxIdleTime {
- activeConnections = append(activeConnections, conn)
- } else {
- // Close idle connection
- conn.ipcClient.Disconnect()
- p.logger.WithFields(logrus.Fields{
- "session_id": conn.sessionID,
- "idle_time": now.Sub(conn.lastUsed),
- }).Debug("🧹 Cleaned up idle RDMA connection")
- }
- }
- p.connections = activeConnections
- }
- // Close closes all connections in the pool
- func (p *ConnectionPool) Close() {
- p.mutex.Lock()
- defer p.mutex.Unlock()
- for _, conn := range p.connections {
- conn.ipcClient.Disconnect()
- }
- p.connections = nil
- p.logger.Info("🔌 Connection pool closed")
- }
- // NewClient creates a new RDMA client
- func NewClient(config *Config) *Client {
- if config.Logger == nil {
- config.Logger = logrus.New()
- config.Logger.SetLevel(logrus.InfoLevel)
- }
- if config.DefaultTimeout == 0 {
- config.DefaultTimeout = 30 * time.Second
- }
- client := &Client{
- logger: config.Logger,
- enginePath: config.EngineSocketPath,
- defaultTimeout: config.DefaultTimeout,
- }
- // Initialize connection pooling if enabled (default: true)
- enablePooling := config.EnablePooling
- if config.MaxConnections == 0 && config.MaxIdleTime == 0 {
- // Default to enabled if not explicitly configured
- enablePooling = true
- }
- if enablePooling {
- client.pool = NewConnectionPool(
- config.EngineSocketPath,
- config.MaxConnections,
- config.MaxIdleTime,
- config.Logger,
- )
- // Start cleanup goroutine
- go client.startCleanupRoutine()
- config.Logger.WithFields(logrus.Fields{
- "max_connections": client.pool.maxConnections,
- "max_idle_time": client.pool.maxIdleTime,
- }).Info("🔌 RDMA connection pooling enabled")
- } else {
- // Legacy single connection mode
- client.ipcClient = ipc.NewClient(config.EngineSocketPath, config.Logger)
- config.Logger.Info("🔌 RDMA single connection mode (pooling disabled)")
- }
- return client
- }
- // startCleanupRoutine starts a background goroutine to clean up idle connections
- func (c *Client) startCleanupRoutine() {
- ticker := time.NewTicker(1 * time.Minute) // Cleanup every minute
- go func() {
- defer ticker.Stop()
- for range ticker.C {
- if c.pool != nil {
- c.pool.cleanup()
- }
- }
- }()
- }
- // Connect establishes connection to the Rust RDMA engine and queries capabilities
- func (c *Client) Connect(ctx context.Context) error {
- c.logger.Info("🚀 Connecting to RDMA engine")
- if c.pool != nil {
- // Connection pooling mode - connections are created on-demand
- c.connected = true
- c.logger.Info("✅ RDMA client ready (connection pooling enabled)")
- return nil
- }
- // Single connection mode
- if err := c.ipcClient.Connect(ctx); err != nil {
- return fmt.Errorf("failed to connect to IPC: %w", err)
- }
- // Test connectivity with ping
- clientID := "rdma-client"
- pong, err := c.ipcClient.Ping(ctx, &clientID)
- if err != nil {
- c.ipcClient.Disconnect()
- return fmt.Errorf("failed to ping RDMA engine: %w", err)
- }
- latency := time.Duration(pong.ServerRttNs)
- c.logger.WithFields(logrus.Fields{
- "latency": latency,
- "server_rtt": time.Duration(pong.ServerRttNs),
- }).Info("📡 RDMA engine ping successful")
- // Get capabilities
- caps, err := c.ipcClient.GetCapabilities(ctx, &clientID)
- if err != nil {
- c.ipcClient.Disconnect()
- return fmt.Errorf("failed to get engine capabilities: %w", err)
- }
- c.capabilities = caps
- c.connected = true
- c.logger.WithFields(logrus.Fields{
- "version": caps.Version,
- "device_name": caps.DeviceName,
- "vendor_id": caps.VendorId,
- "max_sessions": caps.MaxSessions,
- "max_transfer_size": caps.MaxTransferSize,
- "active_sessions": caps.ActiveSessions,
- "real_rdma": caps.RealRdma,
- "port_gid": caps.PortGid,
- "port_lid": caps.PortLid,
- }).Info("✅ RDMA engine connected and ready")
- return nil
- }
- // Disconnect closes the connection to the RDMA engine
- func (c *Client) Disconnect() {
- if c.connected {
- if c.pool != nil {
- // Connection pooling mode
- c.pool.Close()
- c.logger.Info("🔌 Disconnected from RDMA engine (pool closed)")
- } else {
- // Single connection mode
- c.ipcClient.Disconnect()
- c.logger.Info("🔌 Disconnected from RDMA engine")
- }
- c.connected = false
- }
- }
- // IsConnected returns true if connected to the RDMA engine
- func (c *Client) IsConnected() bool {
- if c.pool != nil {
- // Connection pooling mode - always connected if pool exists
- return c.connected
- } else {
- // Single connection mode
- return c.connected && c.ipcClient.IsConnected()
- }
- }
- // GetCapabilities returns the RDMA engine capabilities
- func (c *Client) GetCapabilities() *ipc.GetCapabilitiesResponse {
- return c.capabilities
- }
- // Read performs an RDMA read operation for a SeaweedFS needle
- func (c *Client) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
- if !c.IsConnected() {
- return nil, fmt.Errorf("not connected to RDMA engine")
- }
- startTime := time.Now()
- c.logger.WithFields(logrus.Fields{
- "volume_id": req.VolumeID,
- "needle_id": req.NeedleID,
- "offset": req.Offset,
- "size": req.Size,
- }).Debug("📖 Starting RDMA read operation")
- if c.pool != nil {
- // Connection pooling mode
- return c.readWithPool(ctx, req, startTime)
- }
- // Single connection mode
- // Create IPC request
- ipcReq := &ipc.StartReadRequest{
- VolumeID: req.VolumeID,
- NeedleID: req.NeedleID,
- Cookie: req.Cookie,
- Offset: req.Offset,
- Size: req.Size,
- RemoteAddr: 0, // Will be set by engine (mock for now)
- RemoteKey: 0, // Will be set by engine (mock for now)
- TimeoutSecs: uint64(c.defaultTimeout.Seconds()),
- AuthToken: req.AuthToken,
- }
- // Start RDMA read
- startResp, err := c.ipcClient.StartRead(ctx, ipcReq)
- if err != nil {
- c.logger.WithError(err).Error("❌ Failed to start RDMA read")
- return nil, fmt.Errorf("failed to start RDMA read: %w", err)
- }
- // In the new protocol, if we got a StartReadResponse, the operation was successful
- c.logger.WithFields(logrus.Fields{
- "session_id": startResp.SessionID,
- "local_addr": fmt.Sprintf("0x%x", startResp.LocalAddr),
- "local_key": startResp.LocalKey,
- "transfer_size": startResp.TransferSize,
- "expected_crc": fmt.Sprintf("0x%x", startResp.ExpectedCrc),
- "expires_at": time.Unix(0, int64(startResp.ExpiresAtNs)).Format(time.RFC3339),
- }).Debug("📖 RDMA read session started")
- // Complete the RDMA read
- completeResp, err := c.ipcClient.CompleteRead(ctx, startResp.SessionID, true, startResp.TransferSize, &startResp.ExpectedCrc)
- if err != nil {
- c.logger.WithError(err).Error("❌ Failed to complete RDMA read")
- return nil, fmt.Errorf("failed to complete RDMA read: %w", err)
- }
- duration := time.Since(startTime)
- if !completeResp.Success {
- errorMsg := "unknown error"
- if completeResp.Message != nil {
- errorMsg = *completeResp.Message
- }
- c.logger.WithFields(logrus.Fields{
- "session_id": startResp.SessionID,
- "error_message": errorMsg,
- }).Error("❌ RDMA read completion failed")
- return nil, fmt.Errorf("RDMA read completion failed: %s", errorMsg)
- }
- // Calculate transfer rate (bytes/second)
- transferRate := float64(startResp.TransferSize) / duration.Seconds()
- c.logger.WithFields(logrus.Fields{
- "session_id": startResp.SessionID,
- "bytes_read": startResp.TransferSize,
- "duration": duration,
- "transfer_rate": transferRate,
- "server_crc": completeResp.ServerCrc,
- }).Info("✅ RDMA read completed successfully")
- // MOCK DATA IMPLEMENTATION - FOR DEVELOPMENT/TESTING ONLY
- //
- // This section generates placeholder data for the mock RDMA implementation.
- // In a production RDMA implementation, this should be replaced with:
- //
- // 1. The actual data transferred via RDMA from the remote memory region
- // 2. Data validation using checksums/CRC from the RDMA completion
- // 3. Proper error handling for RDMA transfer failures
- // 4. Memory region cleanup and deregistration
- //
- // TODO for real RDMA implementation:
- // - Replace mockData with actual RDMA buffer contents
- // - Validate data integrity using server CRC: completeResp.ServerCrc
- // - Handle partial transfers and retry logic
- // - Implement proper memory management for RDMA regions
- //
- // Current mock behavior: Generates a simple pattern (0,1,2...255,0,1,2...)
- // This allows testing of the integration pipeline without real hardware
- mockData := make([]byte, startResp.TransferSize)
- for i := range mockData {
- mockData[i] = byte(i % 256) // Simple repeating pattern for verification
- }
- // END MOCK DATA IMPLEMENTATION
- return &ReadResponse{
- Data: mockData,
- BytesRead: startResp.TransferSize,
- Duration: duration,
- TransferRate: transferRate,
- SessionID: startResp.SessionID,
- Success: true,
- Message: "RDMA read completed successfully",
- }, nil
- }
- // ReadRange performs an RDMA read for a specific range within a needle
- func (c *Client) ReadRange(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32, offset, size uint64) (*ReadResponse, error) {
- req := &ReadRequest{
- VolumeID: volumeID,
- NeedleID: needleID,
- Cookie: cookie,
- Offset: offset,
- Size: size,
- }
- return c.Read(ctx, req)
- }
- // ReadFileRange performs an RDMA read using SeaweedFS file ID format
- func (c *Client) ReadFileRange(ctx context.Context, fileID string, offset, size uint64) (*ReadResponse, error) {
- // Parse file ID (e.g., "3,01637037d6" -> volume=3, needle=0x01637037d6, cookie extracted)
- volumeID, needleID, cookie, err := parseFileID(fileID)
- if err != nil {
- return nil, fmt.Errorf("invalid file ID %s: %w", fileID, err)
- }
-
- req := &ReadRequest{
- VolumeID: volumeID,
- NeedleID: needleID,
- Cookie: cookie,
- Offset: offset,
- Size: size,
- }
- return c.Read(ctx, req)
- }
- // parseFileID extracts volume ID, needle ID, and cookie from a SeaweedFS file ID
- // Uses existing SeaweedFS parsing logic to ensure compatibility
- func parseFileID(fileId string) (volumeID uint32, needleID uint64, cookie uint32, err error) {
- // Use existing SeaweedFS file ID parsing
- fid, err := needle.ParseFileIdFromString(fileId)
- if err != nil {
- return 0, 0, 0, fmt.Errorf("failed to parse file ID %s: %w", fileId, err)
- }
- volumeID = uint32(fid.VolumeId)
- needleID = uint64(fid.Key)
- cookie = uint32(fid.Cookie)
- return volumeID, needleID, cookie, nil
- }
- // ReadFull performs an RDMA read for an entire needle
- func (c *Client) ReadFull(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32) (*ReadResponse, error) {
- req := &ReadRequest{
- VolumeID: volumeID,
- NeedleID: needleID,
- Cookie: cookie,
- Offset: 0,
- Size: 0, // 0 means read entire needle
- }
- return c.Read(ctx, req)
- }
- // Ping tests connectivity to the RDMA engine
- func (c *Client) Ping(ctx context.Context) (time.Duration, error) {
- if !c.IsConnected() {
- return 0, fmt.Errorf("not connected to RDMA engine")
- }
- clientID := "health-check"
- start := time.Now()
- pong, err := c.ipcClient.Ping(ctx, &clientID)
- if err != nil {
- return 0, err
- }
- totalLatency := time.Since(start)
- serverRtt := time.Duration(pong.ServerRttNs)
- c.logger.WithFields(logrus.Fields{
- "total_latency": totalLatency,
- "server_rtt": serverRtt,
- "client_id": clientID,
- }).Debug("🏓 RDMA engine ping successful")
- return totalLatency, nil
- }
- // readWithPool performs RDMA read using connection pooling
- func (c *Client) readWithPool(ctx context.Context, req *ReadRequest, startTime time.Time) (*ReadResponse, error) {
- // Get connection from pool
- conn, err := c.pool.getConnection(ctx)
- if err != nil {
- return nil, fmt.Errorf("failed to get pooled connection: %w", err)
- }
- defer c.pool.releaseConnection(conn)
- c.logger.WithField("session_id", conn.sessionID).Debug("🔌 Using pooled RDMA connection")
- // Create IPC request
- ipcReq := &ipc.StartReadRequest{
- VolumeID: req.VolumeID,
- NeedleID: req.NeedleID,
- Cookie: req.Cookie,
- Offset: req.Offset,
- Size: req.Size,
- RemoteAddr: 0, // Will be set by engine (mock for now)
- RemoteKey: 0, // Will be set by engine (mock for now)
- TimeoutSecs: uint64(c.defaultTimeout.Seconds()),
- AuthToken: req.AuthToken,
- }
- // Start RDMA read
- startResp, err := conn.ipcClient.StartRead(ctx, ipcReq)
- if err != nil {
- c.logger.WithError(err).Error("❌ Failed to start RDMA read (pooled)")
- return nil, fmt.Errorf("failed to start RDMA read: %w", err)
- }
- c.logger.WithFields(logrus.Fields{
- "session_id": startResp.SessionID,
- "local_addr": fmt.Sprintf("0x%x", startResp.LocalAddr),
- "local_key": startResp.LocalKey,
- "transfer_size": startResp.TransferSize,
- "expected_crc": fmt.Sprintf("0x%x", startResp.ExpectedCrc),
- "expires_at": time.Unix(0, int64(startResp.ExpiresAtNs)).Format(time.RFC3339),
- "pooled": true,
- }).Debug("📖 RDMA read session started (pooled)")
- // Complete the RDMA read
- completeResp, err := conn.ipcClient.CompleteRead(ctx, startResp.SessionID, true, startResp.TransferSize, &startResp.ExpectedCrc)
- if err != nil {
- c.logger.WithError(err).Error("❌ Failed to complete RDMA read (pooled)")
- return nil, fmt.Errorf("failed to complete RDMA read: %w", err)
- }
- duration := time.Since(startTime)
- if !completeResp.Success {
- errorMsg := "unknown error"
- if completeResp.Message != nil {
- errorMsg = *completeResp.Message
- }
- c.logger.WithFields(logrus.Fields{
- "session_id": conn.sessionID,
- "error_message": errorMsg,
- "pooled": true,
- }).Error("❌ RDMA read completion failed (pooled)")
- return nil, fmt.Errorf("RDMA read completion failed: %s", errorMsg)
- }
- // Calculate transfer rate (bytes/second)
- transferRate := float64(startResp.TransferSize) / duration.Seconds()
- c.logger.WithFields(logrus.Fields{
- "session_id": conn.sessionID,
- "bytes_read": startResp.TransferSize,
- "duration": duration,
- "transfer_rate": transferRate,
- "server_crc": completeResp.ServerCrc,
- "pooled": true,
- }).Info("✅ RDMA read completed successfully (pooled)")
- // For the mock implementation, we'll return placeholder data
- // In the real implementation, this would be the actual RDMA transferred data
- mockData := make([]byte, startResp.TransferSize)
- for i := range mockData {
- mockData[i] = byte(i % 256) // Simple pattern for testing
- }
- return &ReadResponse{
- Data: mockData,
- BytesRead: startResp.TransferSize,
- Duration: duration,
- TransferRate: transferRate,
- SessionID: conn.sessionID,
- Success: true,
- Message: "RDMA read successful (pooled)",
- }, nil
- }
|