The RDMA client (RDMAMountClient) exists in weed/mount/rdma_client.go but is not yet integrated into the actual file read path. The integration points are identified but not implemented.
// File: weed/mount/weedfs_file_read.go:41
func (wfs *WFS) Read(cancel <-chan struct{}, in *fuse.ReadIn, buff []byte) (fuse.ReadResult, fuse.Status) {
fh := wfs.GetHandle(FileHandleId(in.Fh))
// ...
offset := int64(in.Offset)
totalRead, err := readDataByFileHandleWithContext(ctx, buff, fh, offset)
// ...
return fuse.ReadResultData(buff[:totalRead]), fuse.OK
}
// File: weed/mount/weedfs_file_read.go:103
func readDataByFileHandleWithContext(ctx context.Context, buff []byte, fhIn *FileHandle, offset int64) (int64, error) {
size := len(buff)
fhIn.lockForRead(offset, size)
defer fhIn.unlockForRead(offset, size)
// KEY INTEGRATION POINT: This is where RDMA should be attempted
n, tsNs, err := fhIn.readFromChunksWithContext(ctx, buff, offset)
// ...
return n, err
}
// File: weed/mount/filehandle_read.go:29
func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte, offset int64) (int64, int64, error) {
// ...
// CURRENT: Direct chunk reading without RDMA
totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(ctx, fileSize, buff, offset)
// MISSING: RDMA integration should happen here
return int64(totalRead), ts, err
}
The integration should happen in readFromChunksWithContext like this:
func (fh *FileHandle) readFromChunksWithContext(ctx context.Context, buff []byte, offset int64) (int64, int64, error) {
// ... existing code ...
// NEW: Try RDMA acceleration first
if fh.wfs.rdmaClient != nil && fh.wfs.rdmaClient.IsHealthy() {
if totalRead, ts, err := fh.tryRDMARead(ctx, buff, offset); err == nil {
glog.V(4).Infof("RDMA read successful: %d bytes", totalRead)
return totalRead, ts, nil
}
glog.V(2).Infof("RDMA read failed, falling back to HTTP")
}
// FALLBACK: Original HTTP-based chunk reading
totalRead, ts, err := fh.entryChunkGroup.ReadDataAt(ctx, fileSize, buff, offset)
return int64(totalRead), ts, err
}
// File: weed/mount/rdma_client.go:129
func (c *RDMAMountClient) ReadNeedle(ctx context.Context, volumeID uint32, needleID uint64, cookie uint32, offset, size uint64) ([]byte, bool, error) {
// Prepare request URL
reqURL := fmt.Sprintf("http://%s/read?volume=%d&needle=%d&cookie=%d&offset=%d&size=%d",
c.sidecarAddr, volumeID, needleID, cookie, offset, size)
// Execute HTTP request to RDMA sidecar
resp, err := c.httpClient.Do(req)
// ...
// Return data with RDMA metadata
return data, isRDMA, nil
}
// File: seaweedfs-rdma-sidecar/cmd/demo-server/main.go:375
func (s *DemoServer) readHandler(w http.ResponseWriter, r *http.Request) {
// Parse volume, needle, cookie from URL parameters
volumeID, _ := strconv.ParseUint(query.Get("volume"), 10, 32)
needleID, _ := strconv.ParseUint(query.Get("needle"), 10, 64)
// Use distributed client for volume lookup + RDMA
if s.useDistributed && s.distributedClient != nil {
resp, err = s.distributedClient.ReadNeedle(ctx, req)
} else {
resp, err = s.rdmaClient.ReadNeedle(ctx, req) // Local RDMA
}
// Return binary data or JSON metadata
w.Write(resp.Data)
}
// File: seaweedfs-rdma-sidecar/pkg/seaweedfs/distributed_client.go:45
func (c *DistributedRDMAClient) ReadNeedle(ctx context.Context, req *NeedleReadRequest) (*NeedleReadResponse, error) {
// Step 1: Lookup volume location from master
locations, err := c.locationService.LookupVolume(ctx, req.VolumeID)
// Step 2: Find best server (local preferred)
bestLocation := c.locationService.FindBestLocation(locations)
// Step 3: Make HTTP request to target server's RDMA sidecar
return c.makeRDMARequest(ctx, req, bestLocation, start)
}
// File: rdma-engine/src/ipc.rs:403
async fn handle_start_read(req: StartReadRequest, ...) -> RdmaResult<StartReadResponse> {
// Create RDMA session
let session_id = Uuid::new_v4().to_string();
let buffer = vec![0u8; transfer_size as usize];
// Register memory for RDMA
let memory_region = rdma_context.register_memory(local_addr, transfer_size).await?;
// Perform RDMA read (mock implementation)
rdma_context.post_read(local_addr, remote_addr, remote_key, size, wr_id).await?;
let completions = rdma_context.poll_completion(1).await?;
// Return session info
Ok(StartReadResponse { session_id, local_addr, ... })
}
// File: weed/mount/weedfs.go (needs modification)
type WFS struct {
// ... existing fields ...
rdmaClient *RDMAMountClient // ADD THIS
}
// File: weed/command/mount.go (needs modification)
func runMount(cmd *cobra.Command, args []string) bool {
// ... existing code ...
// NEW: Initialize RDMA client if enabled
var rdmaClient *mount.RDMAMountClient
if *mountOptions.rdmaEnabled && *mountOptions.rdmaSidecarAddr != "" {
rdmaClient, err = mount.NewRDMAMountClient(
*mountOptions.rdmaSidecarAddr,
*mountOptions.rdmaMaxConcurrent,
*mountOptions.rdmaTimeoutMs,
)
if err != nil {
glog.Warningf("Failed to initialize RDMA client: %v", err)
}
}
// Pass RDMA client to WFS
wfs := mount.NewSeaweedFileSystem(&mount.Option{
// ... existing options ...
RDMAClient: rdmaClient, // ADD THIS
})
}
// File: weed/mount/filehandle_read.go (needs new method)
func (fh *FileHandle) tryRDMARead(ctx context.Context, buff []byte, offset int64) (int64, int64, error) {
entry := fh.GetEntry()
// Find which chunk contains the requested offset
for _, chunk := range entry.GetEntry().Chunks {
if offset >= chunk.Offset && offset < chunk.Offset+int64(chunk.Size) {
// Parse chunk.FileId to get volume, needle, cookie
volumeID, needleID, cookie, err := ParseFileId(chunk.FileId)
if err != nil {
return 0, 0, err
}
// Calculate offset within the chunk
chunkOffset := uint64(offset - chunk.Offset)
readSize := uint64(min(len(buff), int(chunk.Size-chunkOffset)))
// Make RDMA request
data, isRDMA, err := fh.wfs.rdmaClient.ReadNeedle(
ctx, volumeID, needleID, cookie, chunkOffset, readSize)
if err != nil {
return 0, 0, err
}
// Copy data to buffer
copied := copy(buff, data)
return int64(copied), time.Now().UnixNano(), nil
}
}
return 0, 0, fmt.Errorf("chunk not found for offset %d", offset)
}
read() system callWFS.Read()readDataByFileHandleWithContext()fh.readFromChunksWithContext()RDMAMountClient.ReadNeedle()RDMAMountClient with HTTP communicationtryRDMARead() method not implementedtryRDMARead() methodreadFromChunksWithContext()The architecture is sound and most components exist - only the final integration wiring is needed!