client.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630
  1. // Package rdma provides high-level RDMA operations for SeaweedFS integration
  2. package rdma
  3. import (
  4. "context"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "seaweedfs-rdma-sidecar/pkg/ipc"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  10. "github.com/sirupsen/logrus"
  11. )
  12. // PooledConnection represents a pooled RDMA connection
  13. type PooledConnection struct {
  14. ipcClient *ipc.Client
  15. lastUsed time.Time
  16. inUse bool
  17. sessionID string
  18. created time.Time
  19. }
  20. // ConnectionPool manages a pool of RDMA connections
  21. type ConnectionPool struct {
  22. connections []*PooledConnection
  23. mutex sync.RWMutex
  24. maxConnections int
  25. maxIdleTime time.Duration
  26. enginePath string
  27. logger *logrus.Logger
  28. }
  29. // Client provides high-level RDMA operations with connection pooling
  30. type Client struct {
  31. pool *ConnectionPool
  32. logger *logrus.Logger
  33. enginePath string
  34. capabilities *ipc.GetCapabilitiesResponse
  35. connected bool
  36. defaultTimeout time.Duration
  37. // Legacy single connection (for backward compatibility)
  38. ipcClient *ipc.Client
  39. }
  40. // Config holds configuration for the RDMA client
  41. type Config struct {
  42. EngineSocketPath string
  43. DefaultTimeout time.Duration
  44. Logger *logrus.Logger
  45. // Connection pooling options
  46. EnablePooling bool // Enable connection pooling (default: true)
  47. MaxConnections int // Max connections in pool (default: 10)
  48. MaxIdleTime time.Duration // Max idle time before connection cleanup (default: 5min)
  49. }
  50. // ReadRequest represents a SeaweedFS needle read request
  51. type ReadRequest struct {
  52. VolumeID uint32
  53. NeedleID uint64
  54. Cookie uint32
  55. Offset uint64
  56. Size uint64
  57. AuthToken *string
  58. }
  59. // ReadResponse represents the result of an RDMA read operation
  60. type ReadResponse struct {
  61. Data []byte
  62. BytesRead uint64
  63. Duration time.Duration
  64. TransferRate float64
  65. SessionID string
  66. Success bool
  67. Message string
  68. }
  69. // NewConnectionPool creates a new connection pool
  70. func NewConnectionPool(enginePath string, maxConnections int, maxIdleTime time.Duration, logger *logrus.Logger) *ConnectionPool {
  71. if maxConnections <= 0 {
  72. maxConnections = 10 // Default
  73. }
  74. if maxIdleTime <= 0 {
  75. maxIdleTime = 5 * time.Minute // Default
  76. }
  77. return &ConnectionPool{
  78. connections: make([]*PooledConnection, 0, maxConnections),
  79. maxConnections: maxConnections,
  80. maxIdleTime: maxIdleTime,
  81. enginePath: enginePath,
  82. logger: logger,
  83. }
  84. }
  85. // getConnection gets an available connection from the pool or creates a new one
  86. func (p *ConnectionPool) getConnection(ctx context.Context) (*PooledConnection, error) {
  87. p.mutex.Lock()
  88. defer p.mutex.Unlock()
  89. // Look for an available connection
  90. for _, conn := range p.connections {
  91. if !conn.inUse && time.Since(conn.lastUsed) < p.maxIdleTime {
  92. conn.inUse = true
  93. conn.lastUsed = time.Now()
  94. p.logger.WithField("session_id", conn.sessionID).Debug("🔌 Reusing pooled RDMA connection")
  95. return conn, nil
  96. }
  97. }
  98. // Create new connection if under limit
  99. if len(p.connections) < p.maxConnections {
  100. ipcClient := ipc.NewClient(p.enginePath, p.logger)
  101. if err := ipcClient.Connect(ctx); err != nil {
  102. return nil, fmt.Errorf("failed to create new pooled connection: %w", err)
  103. }
  104. conn := &PooledConnection{
  105. ipcClient: ipcClient,
  106. lastUsed: time.Now(),
  107. inUse: true,
  108. sessionID: fmt.Sprintf("pool-%d-%d", len(p.connections), time.Now().Unix()),
  109. created: time.Now(),
  110. }
  111. p.connections = append(p.connections, conn)
  112. p.logger.WithFields(logrus.Fields{
  113. "session_id": conn.sessionID,
  114. "pool_size": len(p.connections),
  115. }).Info("🚀 Created new pooled RDMA connection")
  116. return conn, nil
  117. }
  118. // Pool is full, wait for an available connection
  119. return nil, fmt.Errorf("connection pool exhausted (max: %d)", p.maxConnections)
  120. }
  121. // releaseConnection returns a connection to the pool
  122. func (p *ConnectionPool) releaseConnection(conn *PooledConnection) {
  123. p.mutex.Lock()
  124. defer p.mutex.Unlock()
  125. conn.inUse = false
  126. conn.lastUsed = time.Now()
  127. p.logger.WithField("session_id", conn.sessionID).Debug("🔄 Released RDMA connection back to pool")
  128. }
  129. // cleanup removes idle connections from the pool
  130. func (p *ConnectionPool) cleanup() {
  131. p.mutex.Lock()
  132. defer p.mutex.Unlock()
  133. now := time.Now()
  134. activeConnections := make([]*PooledConnection, 0, len(p.connections))
  135. for _, conn := range p.connections {
  136. if conn.inUse || now.Sub(conn.lastUsed) < p.maxIdleTime {
  137. activeConnections = append(activeConnections, conn)
  138. } else {
  139. // Close idle connection
  140. conn.ipcClient.Disconnect()
  141. p.logger.WithFields(logrus.Fields{
  142. "session_id": conn.sessionID,
  143. "idle_time": now.Sub(conn.lastUsed),
  144. }).Debug("🧹 Cleaned up idle RDMA connection")
  145. }
  146. }
  147. p.connections = activeConnections
  148. }
  149. // Close closes all connections in the pool
  150. func (p *ConnectionPool) Close() {
  151. p.mutex.Lock()
  152. defer p.mutex.Unlock()
  153. for _, conn := range p.connections {
  154. conn.ipcClient.Disconnect()
  155. }
  156. p.connections = nil
  157. p.logger.Info("🔌 Connection pool closed")
  158. }
  159. // NewClient creates a new RDMA client
  160. func NewClient(config *Config) *Client {
  161. if config.Logger == nil {
  162. config.Logger = logrus.New()
  163. config.Logger.SetLevel(logrus.InfoLevel)
  164. }
  165. if config.DefaultTimeout == 0 {
  166. config.DefaultTimeout = 30 * time.Second
  167. }
  168. client := &Client{
  169. logger: config.Logger,
  170. enginePath: config.EngineSocketPath,
  171. defaultTimeout: config.DefaultTimeout,
  172. }
  173. // Initialize connection pooling if enabled (default: true)
  174. enablePooling := config.EnablePooling
  175. if config.MaxConnections == 0 && config.MaxIdleTime == 0 {
  176. // Default to enabled if not explicitly configured
  177. enablePooling = true
  178. }
  179. if enablePooling {
  180. client.pool = NewConnectionPool(
  181. config.EngineSocketPath,
  182. config.MaxConnections,
  183. config.MaxIdleTime,
  184. config.Logger,
  185. )
  186. // Start cleanup goroutine
  187. go client.startCleanupRoutine()
  188. config.Logger.WithFields(logrus.Fields{
  189. "max_connections": client.pool.maxConnections,
  190. "max_idle_time": client.pool.maxIdleTime,
  191. }).Info("🔌 RDMA connection pooling enabled")
  192. } else {
  193. // Legacy single connection mode
  194. client.ipcClient = ipc.NewClient(config.EngineSocketPath, config.Logger)
  195. config.Logger.Info("🔌 RDMA single connection mode (pooling disabled)")
  196. }
  197. return client
  198. }
  199. // startCleanupRoutine starts a background goroutine to clean up idle connections
  200. func (c *Client) startCleanupRoutine() {
  201. ticker := time.NewTicker(1 * time.Minute) // Cleanup every minute
  202. go func() {
  203. defer ticker.Stop()
  204. for range ticker.C {
  205. if c.pool != nil {
  206. c.pool.cleanup()
  207. }
  208. }
  209. }()
  210. }
  211. // Connect establishes connection to the Rust RDMA engine and queries capabilities
  212. func (c *Client) Connect(ctx context.Context) error {
  213. c.logger.Info("🚀 Connecting to RDMA engine")
  214. if c.pool != nil {
  215. // Connection pooling mode - connections are created on-demand
  216. c.connected = true
  217. c.logger.Info("✅ RDMA client ready (connection pooling enabled)")
  218. return nil
  219. }
  220. // Single connection mode
  221. if err := c.ipcClient.Connect(ctx); err != nil {
  222. return fmt.Errorf("failed to connect to IPC: %w", err)
  223. }
  224. // Test connectivity with ping
  225. clientID := "rdma-client"
  226. pong, err := c.ipcClient.Ping(ctx, &clientID)
  227. if err != nil {
  228. c.ipcClient.Disconnect()
  229. return fmt.Errorf("failed to ping RDMA engine: %w", err)
  230. }
  231. latency := time.Duration(pong.ServerRttNs)
  232. c.logger.WithFields(logrus.Fields{
  233. "latency": latency,
  234. "server_rtt": time.Duration(pong.ServerRttNs),
  235. }).Info("📡 RDMA engine ping successful")
  236. // Get capabilities
  237. caps, err := c.ipcClient.GetCapabilities(ctx, &clientID)
  238. if err != nil {
  239. c.ipcClient.Disconnect()
  240. return fmt.Errorf("failed to get engine capabilities: %w", err)
  241. }
  242. c.capabilities = caps
  243. c.connected = true
  244. c.logger.WithFields(logrus.Fields{
  245. "version": caps.Version,
  246. "device_name": caps.DeviceName,
  247. "vendor_id": caps.VendorId,
  248. "max_sessions": caps.MaxSessions,
  249. "max_transfer_size": caps.MaxTransferSize,
  250. "active_sessions": caps.ActiveSessions,
  251. "real_rdma": caps.RealRdma,
  252. "port_gid": caps.PortGid,
  253. "port_lid": caps.PortLid,
  254. }).Info("✅ RDMA engine connected and ready")
  255. return nil
  256. }
  257. // Disconnect closes the connection to the RDMA engine
  258. func (c *Client) Disconnect() {
  259. if c.connected {
  260. if c.pool != nil {
  261. // Connection pooling mode
  262. c.pool.Close()
  263. c.logger.Info("🔌 Disconnected from RDMA engine (pool closed)")
  264. } else {
  265. // Single connection mode
  266. c.ipcClient.Disconnect()
  267. c.logger.Info("🔌 Disconnected from RDMA engine")
  268. }
  269. c.connected = false
  270. }
  271. }
  272. // IsConnected returns true if connected to the RDMA engine
  273. func (c *Client) IsConnected() bool {
  274. if c.pool != nil {
  275. // Connection pooling mode - always connected if pool exists
  276. return c.connected
  277. } else {
  278. // Single connection mode
  279. return c.connected && c.ipcClient.IsConnected()
  280. }
  281. }
  282. // GetCapabilities returns the RDMA engine capabilities
  283. func (c *Client) GetCapabilities() *ipc.GetCapabilitiesResponse {
  284. return c.capabilities
  285. }
  286. // Read performs an RDMA read operation for a SeaweedFS needle
  287. func (c *Client) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
  288. if !c.IsConnected() {
  289. return nil, fmt.Errorf("not connected to RDMA engine")
  290. }
  291. startTime := time.Now()
  292. c.logger.WithFields(logrus.Fields{
  293. "volume_id": req.VolumeID,
  294. "needle_id": req.NeedleID,
  295. "offset": req.Offset,
  296. "size": req.Size,
  297. }).Debug("📖 Starting RDMA read operation")
  298. if c.pool != nil {
  299. // Connection pooling mode
  300. return c.readWithPool(ctx, req, startTime)
  301. }
  302. // Single connection mode
  303. // Create IPC request
  304. ipcReq := &ipc.StartReadRequest{
  305. VolumeID: req.VolumeID,
  306. NeedleID: req.NeedleID,
  307. Cookie: req.Cookie,
  308. Offset: req.Offset,
  309. Size: req.Size,
  310. RemoteAddr: 0, // Will be set by engine (mock for now)
  311. RemoteKey: 0, // Will be set by engine (mock for now)
  312. TimeoutSecs: uint64(c.defaultTimeout.Seconds()),
  313. AuthToken: req.AuthToken,
  314. }
  315. // Start RDMA read
  316. startResp, err := c.ipcClient.StartRead(ctx, ipcReq)
  317. if err != nil {
  318. c.logger.WithError(err).Error("❌ Failed to start RDMA read")
  319. return nil, fmt.Errorf("failed to start RDMA read: %w", err)
  320. }
  321. // In the new protocol, if we got a StartReadResponse, the operation was successful
  322. c.logger.WithFields(logrus.Fields{
  323. "session_id": startResp.SessionID,
  324. "local_addr": fmt.Sprintf("0x%x", startResp.LocalAddr),
  325. "local_key": startResp.LocalKey,
  326. "transfer_size": startResp.TransferSize,
  327. "expected_crc": fmt.Sprintf("0x%x", startResp.ExpectedCrc),
  328. "expires_at": time.Unix(0, int64(startResp.ExpiresAtNs)).Format(time.RFC3339),
  329. }).Debug("📖 RDMA read session started")
  330. // Complete the RDMA read
  331. completeResp, err := c.ipcClient.CompleteRead(ctx, startResp.SessionID, true, startResp.TransferSize, &startResp.ExpectedCrc)
  332. if err != nil {
  333. c.logger.WithError(err).Error("❌ Failed to complete RDMA read")
  334. return nil, fmt.Errorf("failed to complete RDMA read: %w", err)
  335. }
  336. duration := time.Since(startTime)
  337. if !completeResp.Success {
  338. errorMsg := "unknown error"
  339. if completeResp.Message != nil {
  340. errorMsg = *completeResp.Message
  341. }
  342. c.logger.WithFields(logrus.Fields{
  343. "session_id": startResp.SessionID,
  344. "error_message": errorMsg,
  345. }).Error("❌ RDMA read completion failed")
  346. return nil, fmt.Errorf("RDMA read completion failed: %s", errorMsg)
  347. }
  348. // Calculate transfer rate (bytes/second)
  349. transferRate := float64(startResp.TransferSize) / duration.Seconds()
  350. c.logger.WithFields(logrus.Fields{
  351. "session_id": startResp.SessionID,
  352. "bytes_read": startResp.TransferSize,
  353. "duration": duration,
  354. "transfer_rate": transferRate,
  355. "server_crc": completeResp.ServerCrc,
  356. }).Info("✅ RDMA read completed successfully")
  357. // MOCK DATA IMPLEMENTATION - FOR DEVELOPMENT/TESTING ONLY
  358. //
  359. // This section generates placeholder data for the mock RDMA implementation.
  360. // In a production RDMA implementation, this should be replaced with:
  361. //
  362. // 1. The actual data transferred via RDMA from the remote memory region
  363. // 2. Data validation using checksums/CRC from the RDMA completion
  364. // 3. Proper error handling for RDMA transfer failures
  365. // 4. Memory region cleanup and deregistration
  366. //
  367. // TODO for real RDMA implementation:
  368. // - Replace mockData with actual RDMA buffer contents
  369. // - Validate data integrity using server CRC: completeResp.ServerCrc
  370. // - Handle partial transfers and retry logic
  371. // - Implement proper memory management for RDMA regions
  372. //
  373. // Current mock behavior: Generates a simple pattern (0,1,2...255,0,1,2...)
  374. // This allows testing of the integration pipeline without real hardware
  375. mockData := make([]byte, startResp.TransferSize)
  376. for i := range mockData {
  377. mockData[i] = byte(i % 256) // Simple repeating pattern for verification
  378. }
  379. // END MOCK DATA IMPLEMENTATION
  380. return &ReadResponse{
  381. Data: mockData,
  382. BytesRead: startResp.TransferSize,
  383. Duration: duration,
  384. TransferRate: transferRate,
  385. SessionID: startResp.SessionID,
  386. Success: true,
  387. Message: "RDMA read completed successfully",
  388. }, nil
  389. }
  390. // ReadRange performs an RDMA read for a specific range within a needle
  391. func (c *Client) ReadRange(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32, offset, size uint64) (*ReadResponse, error) {
  392. req := &ReadRequest{
  393. VolumeID: volumeID,
  394. NeedleID: needleID,
  395. Cookie: cookie,
  396. Offset: offset,
  397. Size: size,
  398. }
  399. return c.Read(ctx, req)
  400. }
  401. // ReadFileRange performs an RDMA read using SeaweedFS file ID format
  402. func (c *Client) ReadFileRange(ctx context.Context, fileID string, offset, size uint64) (*ReadResponse, error) {
  403. // Parse file ID (e.g., "3,01637037d6" -> volume=3, needle=0x01637037d6, cookie extracted)
  404. volumeID, needleID, cookie, err := parseFileID(fileID)
  405. if err != nil {
  406. return nil, fmt.Errorf("invalid file ID %s: %w", fileID, err)
  407. }
  408. req := &ReadRequest{
  409. VolumeID: volumeID,
  410. NeedleID: needleID,
  411. Cookie: cookie,
  412. Offset: offset,
  413. Size: size,
  414. }
  415. return c.Read(ctx, req)
  416. }
  417. // parseFileID extracts volume ID, needle ID, and cookie from a SeaweedFS file ID
  418. // Uses existing SeaweedFS parsing logic to ensure compatibility
  419. func parseFileID(fileId string) (volumeID uint32, needleID uint64, cookie uint32, err error) {
  420. // Use existing SeaweedFS file ID parsing
  421. fid, err := needle.ParseFileIdFromString(fileId)
  422. if err != nil {
  423. return 0, 0, 0, fmt.Errorf("failed to parse file ID %s: %w", fileId, err)
  424. }
  425. volumeID = uint32(fid.VolumeId)
  426. needleID = uint64(fid.Key)
  427. cookie = uint32(fid.Cookie)
  428. return volumeID, needleID, cookie, nil
  429. }
  430. // ReadFull performs an RDMA read for an entire needle
  431. func (c *Client) ReadFull(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32) (*ReadResponse, error) {
  432. req := &ReadRequest{
  433. VolumeID: volumeID,
  434. NeedleID: needleID,
  435. Cookie: cookie,
  436. Offset: 0,
  437. Size: 0, // 0 means read entire needle
  438. }
  439. return c.Read(ctx, req)
  440. }
  441. // Ping tests connectivity to the RDMA engine
  442. func (c *Client) Ping(ctx context.Context) (time.Duration, error) {
  443. if !c.IsConnected() {
  444. return 0, fmt.Errorf("not connected to RDMA engine")
  445. }
  446. clientID := "health-check"
  447. start := time.Now()
  448. pong, err := c.ipcClient.Ping(ctx, &clientID)
  449. if err != nil {
  450. return 0, err
  451. }
  452. totalLatency := time.Since(start)
  453. serverRtt := time.Duration(pong.ServerRttNs)
  454. c.logger.WithFields(logrus.Fields{
  455. "total_latency": totalLatency,
  456. "server_rtt": serverRtt,
  457. "client_id": clientID,
  458. }).Debug("🏓 RDMA engine ping successful")
  459. return totalLatency, nil
  460. }
  461. // readWithPool performs RDMA read using connection pooling
  462. func (c *Client) readWithPool(ctx context.Context, req *ReadRequest, startTime time.Time) (*ReadResponse, error) {
  463. // Get connection from pool
  464. conn, err := c.pool.getConnection(ctx)
  465. if err != nil {
  466. return nil, fmt.Errorf("failed to get pooled connection: %w", err)
  467. }
  468. defer c.pool.releaseConnection(conn)
  469. c.logger.WithField("session_id", conn.sessionID).Debug("🔌 Using pooled RDMA connection")
  470. // Create IPC request
  471. ipcReq := &ipc.StartReadRequest{
  472. VolumeID: req.VolumeID,
  473. NeedleID: req.NeedleID,
  474. Cookie: req.Cookie,
  475. Offset: req.Offset,
  476. Size: req.Size,
  477. RemoteAddr: 0, // Will be set by engine (mock for now)
  478. RemoteKey: 0, // Will be set by engine (mock for now)
  479. TimeoutSecs: uint64(c.defaultTimeout.Seconds()),
  480. AuthToken: req.AuthToken,
  481. }
  482. // Start RDMA read
  483. startResp, err := conn.ipcClient.StartRead(ctx, ipcReq)
  484. if err != nil {
  485. c.logger.WithError(err).Error("❌ Failed to start RDMA read (pooled)")
  486. return nil, fmt.Errorf("failed to start RDMA read: %w", err)
  487. }
  488. c.logger.WithFields(logrus.Fields{
  489. "session_id": startResp.SessionID,
  490. "local_addr": fmt.Sprintf("0x%x", startResp.LocalAddr),
  491. "local_key": startResp.LocalKey,
  492. "transfer_size": startResp.TransferSize,
  493. "expected_crc": fmt.Sprintf("0x%x", startResp.ExpectedCrc),
  494. "expires_at": time.Unix(0, int64(startResp.ExpiresAtNs)).Format(time.RFC3339),
  495. "pooled": true,
  496. }).Debug("📖 RDMA read session started (pooled)")
  497. // Complete the RDMA read
  498. completeResp, err := conn.ipcClient.CompleteRead(ctx, startResp.SessionID, true, startResp.TransferSize, &startResp.ExpectedCrc)
  499. if err != nil {
  500. c.logger.WithError(err).Error("❌ Failed to complete RDMA read (pooled)")
  501. return nil, fmt.Errorf("failed to complete RDMA read: %w", err)
  502. }
  503. duration := time.Since(startTime)
  504. if !completeResp.Success {
  505. errorMsg := "unknown error"
  506. if completeResp.Message != nil {
  507. errorMsg = *completeResp.Message
  508. }
  509. c.logger.WithFields(logrus.Fields{
  510. "session_id": conn.sessionID,
  511. "error_message": errorMsg,
  512. "pooled": true,
  513. }).Error("❌ RDMA read completion failed (pooled)")
  514. return nil, fmt.Errorf("RDMA read completion failed: %s", errorMsg)
  515. }
  516. // Calculate transfer rate (bytes/second)
  517. transferRate := float64(startResp.TransferSize) / duration.Seconds()
  518. c.logger.WithFields(logrus.Fields{
  519. "session_id": conn.sessionID,
  520. "bytes_read": startResp.TransferSize,
  521. "duration": duration,
  522. "transfer_rate": transferRate,
  523. "server_crc": completeResp.ServerCrc,
  524. "pooled": true,
  525. }).Info("✅ RDMA read completed successfully (pooled)")
  526. // For the mock implementation, we'll return placeholder data
  527. // In the real implementation, this would be the actual RDMA transferred data
  528. mockData := make([]byte, startResp.TransferSize)
  529. for i := range mockData {
  530. mockData[i] = byte(i % 256) // Simple pattern for testing
  531. }
  532. return &ReadResponse{
  533. Data: mockData,
  534. BytesRead: startResp.TransferSize,
  535. Duration: duration,
  536. TransferRate: transferRate,
  537. SessionID: conn.sessionID,
  538. Success: true,
  539. Message: "RDMA read successful (pooled)",
  540. }, nil
  541. }