ec_task.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783
  1. package erasure_coding
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "os"
  8. "path/filepath"
  9. "strings"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/operation"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
  19. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  20. "github.com/seaweedfs/seaweedfs/weed/worker/types/base"
  21. "google.golang.org/grpc"
  22. )
  23. // ErasureCodingTask implements the Task interface
  24. type ErasureCodingTask struct {
  25. *base.BaseTask
  26. server string
  27. volumeID uint32
  28. collection string
  29. workDir string
  30. progress float64
  31. // EC parameters
  32. dataShards int32
  33. parityShards int32
  34. targets []*worker_pb.TaskTarget // Unified targets for EC shards
  35. sources []*worker_pb.TaskSource // Unified sources for cleanup
  36. shardAssignment map[string][]string // destination -> assigned shard types
  37. }
  38. // NewErasureCodingTask creates a new unified EC task instance
  39. func NewErasureCodingTask(id string, server string, volumeID uint32, collection string) *ErasureCodingTask {
  40. return &ErasureCodingTask{
  41. BaseTask: base.NewBaseTask(id, types.TaskTypeErasureCoding),
  42. server: server,
  43. volumeID: volumeID,
  44. collection: collection,
  45. dataShards: erasure_coding.DataShardsCount, // Default values
  46. parityShards: erasure_coding.ParityShardsCount, // Default values
  47. }
  48. }
  49. // Execute implements the UnifiedTask interface
  50. func (t *ErasureCodingTask) Execute(ctx context.Context, params *worker_pb.TaskParams) error {
  51. if params == nil {
  52. return fmt.Errorf("task parameters are required")
  53. }
  54. ecParams := params.GetErasureCodingParams()
  55. if ecParams == nil {
  56. return fmt.Errorf("erasure coding parameters are required")
  57. }
  58. t.dataShards = ecParams.DataShards
  59. t.parityShards = ecParams.ParityShards
  60. t.workDir = ecParams.WorkingDir
  61. t.targets = params.Targets // Get unified targets
  62. t.sources = params.Sources // Get unified sources
  63. // Log detailed task information
  64. t.GetLogger().WithFields(map[string]interface{}{
  65. "volume_id": t.volumeID,
  66. "server": t.server,
  67. "collection": t.collection,
  68. "data_shards": t.dataShards,
  69. "parity_shards": t.parityShards,
  70. "total_shards": t.dataShards + t.parityShards,
  71. "targets": len(t.targets),
  72. "sources": len(t.sources),
  73. }).Info("Starting erasure coding task")
  74. // Log detailed target server assignments
  75. for i, target := range t.targets {
  76. t.GetLogger().WithFields(map[string]interface{}{
  77. "target_index": i,
  78. "server": target.Node,
  79. "shard_ids": target.ShardIds,
  80. "shard_count": len(target.ShardIds),
  81. }).Info("Target server shard assignment")
  82. }
  83. // Log source information
  84. for i, source := range t.sources {
  85. t.GetLogger().WithFields(map[string]interface{}{
  86. "source_index": i,
  87. "server": source.Node,
  88. "volume_id": source.VolumeId,
  89. "disk_id": source.DiskId,
  90. "rack": source.Rack,
  91. "data_center": source.DataCenter,
  92. }).Info("Source server information")
  93. }
  94. // Use the working directory from task parameters, or fall back to a default
  95. baseWorkDir := t.workDir
  96. // Create unique working directory for this task
  97. taskWorkDir := filepath.Join(baseWorkDir, fmt.Sprintf("vol_%d_%d", t.volumeID, time.Now().Unix()))
  98. if err := os.MkdirAll(taskWorkDir, 0755); err != nil {
  99. return fmt.Errorf("failed to create task working directory %s: %v", taskWorkDir, err)
  100. }
  101. glog.V(1).Infof("Created working directory: %s", taskWorkDir)
  102. // Update the task's working directory to the specific instance directory
  103. t.workDir = taskWorkDir
  104. glog.V(1).Infof("Task working directory configured: %s (logs will be written here)", taskWorkDir)
  105. // Ensure cleanup of working directory (but preserve logs)
  106. defer func() {
  107. // Clean up volume files and EC shards, but preserve the directory structure and any logs
  108. patterns := []string{"*.dat", "*.idx", "*.ec*", "*.vif"}
  109. for _, pattern := range patterns {
  110. matches, err := filepath.Glob(filepath.Join(taskWorkDir, pattern))
  111. if err != nil {
  112. continue
  113. }
  114. for _, match := range matches {
  115. if err := os.Remove(match); err != nil {
  116. glog.V(2).Infof("Could not remove %s: %v", match, err)
  117. }
  118. }
  119. }
  120. glog.V(1).Infof("Cleaned up volume files from working directory: %s (logs preserved)", taskWorkDir)
  121. }()
  122. // Step 1: Mark volume readonly
  123. t.ReportProgressWithStage(10.0, "Marking volume readonly")
  124. t.GetLogger().Info("Marking volume readonly")
  125. if err := t.markVolumeReadonly(); err != nil {
  126. return fmt.Errorf("failed to mark volume readonly: %v", err)
  127. }
  128. // Step 2: Copy volume files to worker
  129. t.ReportProgressWithStage(25.0, "Copying volume files to worker")
  130. t.GetLogger().Info("Copying volume files to worker")
  131. localFiles, err := t.copyVolumeFilesToWorker(taskWorkDir)
  132. if err != nil {
  133. return fmt.Errorf("failed to copy volume files: %v", err)
  134. }
  135. // Step 3: Generate EC shards locally
  136. t.ReportProgressWithStage(40.0, "Generating EC shards locally")
  137. t.GetLogger().Info("Generating EC shards locally")
  138. shardFiles, err := t.generateEcShardsLocally(localFiles, taskWorkDir)
  139. if err != nil {
  140. return fmt.Errorf("failed to generate EC shards: %v", err)
  141. }
  142. // Step 4: Distribute shards to destinations
  143. t.ReportProgressWithStage(60.0, "Distributing EC shards to destinations")
  144. t.GetLogger().Info("Distributing EC shards to destinations")
  145. if err := t.distributeEcShards(shardFiles); err != nil {
  146. return fmt.Errorf("failed to distribute EC shards: %v", err)
  147. }
  148. // Step 5: Mount EC shards
  149. t.ReportProgressWithStage(80.0, "Mounting EC shards")
  150. t.GetLogger().Info("Mounting EC shards")
  151. if err := t.mountEcShards(); err != nil {
  152. return fmt.Errorf("failed to mount EC shards: %v", err)
  153. }
  154. // Step 6: Delete original volume
  155. t.ReportProgressWithStage(90.0, "Deleting original volume")
  156. t.GetLogger().Info("Deleting original volume")
  157. if err := t.deleteOriginalVolume(); err != nil {
  158. return fmt.Errorf("failed to delete original volume: %v", err)
  159. }
  160. t.ReportProgressWithStage(100.0, "EC processing complete")
  161. glog.Infof("EC task completed successfully: volume %d from %s with %d shards distributed",
  162. t.volumeID, t.server, len(shardFiles))
  163. return nil
  164. }
  165. // Validate implements the UnifiedTask interface
  166. func (t *ErasureCodingTask) Validate(params *worker_pb.TaskParams) error {
  167. if params == nil {
  168. return fmt.Errorf("task parameters are required")
  169. }
  170. ecParams := params.GetErasureCodingParams()
  171. if ecParams == nil {
  172. return fmt.Errorf("erasure coding parameters are required")
  173. }
  174. if params.VolumeId != t.volumeID {
  175. return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId)
  176. }
  177. // Validate that at least one source matches our server
  178. found := false
  179. for _, source := range params.Sources {
  180. if source.Node == t.server {
  181. found = true
  182. break
  183. }
  184. }
  185. if !found {
  186. return fmt.Errorf("no source matches expected server %s", t.server)
  187. }
  188. if ecParams.DataShards < 1 {
  189. return fmt.Errorf("invalid data shards: %d (must be >= 1)", ecParams.DataShards)
  190. }
  191. if ecParams.ParityShards < 1 {
  192. return fmt.Errorf("invalid parity shards: %d (must be >= 1)", ecParams.ParityShards)
  193. }
  194. if len(params.Targets) < int(ecParams.DataShards+ecParams.ParityShards) {
  195. return fmt.Errorf("insufficient targets: got %d, need %d", len(params.Targets), ecParams.DataShards+ecParams.ParityShards)
  196. }
  197. return nil
  198. }
  199. // EstimateTime implements the UnifiedTask interface
  200. func (t *ErasureCodingTask) EstimateTime(params *worker_pb.TaskParams) time.Duration {
  201. // Basic estimate based on simulated steps
  202. return 20 * time.Second // Sum of all step durations
  203. }
  204. // GetProgress returns current progress
  205. func (t *ErasureCodingTask) GetProgress() float64 {
  206. return t.progress
  207. }
  208. // Helper methods for actual EC operations
  209. // markVolumeReadonly marks the volume as readonly on the source server
  210. func (t *ErasureCodingTask) markVolumeReadonly() error {
  211. return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
  212. func(client volume_server_pb.VolumeServerClient) error {
  213. _, err := client.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
  214. VolumeId: t.volumeID,
  215. })
  216. return err
  217. })
  218. }
  219. // copyVolumeFilesToWorker copies .dat and .idx files from source server to local worker
  220. func (t *ErasureCodingTask) copyVolumeFilesToWorker(workDir string) (map[string]string, error) {
  221. localFiles := make(map[string]string)
  222. t.GetLogger().WithFields(map[string]interface{}{
  223. "volume_id": t.volumeID,
  224. "source": t.server,
  225. "working_dir": workDir,
  226. }).Info("Starting volume file copy from source server")
  227. // Copy .dat file
  228. datFile := filepath.Join(workDir, fmt.Sprintf("%d.dat", t.volumeID))
  229. if err := t.copyFileFromSource(".dat", datFile); err != nil {
  230. return nil, fmt.Errorf("failed to copy .dat file: %v", err)
  231. }
  232. localFiles["dat"] = datFile
  233. // Log .dat file size
  234. if info, err := os.Stat(datFile); err == nil {
  235. t.GetLogger().WithFields(map[string]interface{}{
  236. "file_type": ".dat",
  237. "file_path": datFile,
  238. "size_bytes": info.Size(),
  239. "size_mb": float64(info.Size()) / (1024 * 1024),
  240. }).Info("Volume data file copied successfully")
  241. }
  242. // Copy .idx file
  243. idxFile := filepath.Join(workDir, fmt.Sprintf("%d.idx", t.volumeID))
  244. if err := t.copyFileFromSource(".idx", idxFile); err != nil {
  245. return nil, fmt.Errorf("failed to copy .idx file: %v", err)
  246. }
  247. localFiles["idx"] = idxFile
  248. // Log .idx file size
  249. if info, err := os.Stat(idxFile); err == nil {
  250. t.GetLogger().WithFields(map[string]interface{}{
  251. "file_type": ".idx",
  252. "file_path": idxFile,
  253. "size_bytes": info.Size(),
  254. "size_mb": float64(info.Size()) / (1024 * 1024),
  255. }).Info("Volume index file copied successfully")
  256. }
  257. return localFiles, nil
  258. }
  259. // copyFileFromSource copies a file from source server to local path using gRPC streaming
  260. func (t *ErasureCodingTask) copyFileFromSource(ext, localPath string) error {
  261. return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), grpc.WithInsecure(),
  262. func(client volume_server_pb.VolumeServerClient) error {
  263. stream, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
  264. VolumeId: t.volumeID,
  265. Collection: t.collection,
  266. Ext: ext,
  267. StopOffset: uint64(math.MaxInt64),
  268. })
  269. if err != nil {
  270. return fmt.Errorf("failed to initiate file copy: %v", err)
  271. }
  272. // Create local file
  273. localFile, err := os.Create(localPath)
  274. if err != nil {
  275. return fmt.Errorf("failed to create local file %s: %v", localPath, err)
  276. }
  277. defer localFile.Close()
  278. // Stream data and write to local file
  279. totalBytes := int64(0)
  280. for {
  281. resp, err := stream.Recv()
  282. if err == io.EOF {
  283. break
  284. }
  285. if err != nil {
  286. return fmt.Errorf("failed to receive file data: %v", err)
  287. }
  288. if len(resp.FileContent) > 0 {
  289. written, writeErr := localFile.Write(resp.FileContent)
  290. if writeErr != nil {
  291. return fmt.Errorf("failed to write to local file: %v", writeErr)
  292. }
  293. totalBytes += int64(written)
  294. }
  295. }
  296. glog.V(1).Infof("Successfully copied %s (%d bytes) from %s to %s", ext, totalBytes, t.server, localPath)
  297. return nil
  298. })
  299. }
  300. // generateEcShardsLocally generates EC shards from local volume files
  301. func (t *ErasureCodingTask) generateEcShardsLocally(localFiles map[string]string, workDir string) (map[string]string, error) {
  302. datFile := localFiles["dat"]
  303. idxFile := localFiles["idx"]
  304. if datFile == "" || idxFile == "" {
  305. return nil, fmt.Errorf("missing required volume files: dat=%s, idx=%s", datFile, idxFile)
  306. }
  307. // Get base name without extension for EC operations
  308. baseName := strings.TrimSuffix(datFile, ".dat")
  309. shardFiles := make(map[string]string)
  310. glog.V(1).Infof("Generating EC shards from local files: dat=%s, idx=%s", datFile, idxFile)
  311. // Generate EC shard files (.ec00 ~ .ec13)
  312. if err := erasure_coding.WriteEcFiles(baseName); err != nil {
  313. return nil, fmt.Errorf("failed to generate EC shard files: %v", err)
  314. }
  315. // Generate .ecx file from .idx (use baseName, not full idx path)
  316. if err := erasure_coding.WriteSortedFileFromIdx(baseName, ".ecx"); err != nil {
  317. return nil, fmt.Errorf("failed to generate .ecx file: %v", err)
  318. }
  319. // Collect generated shard file paths and log details
  320. var generatedShards []string
  321. var totalShardSize int64
  322. for i := 0; i < erasure_coding.TotalShardsCount; i++ {
  323. shardFile := fmt.Sprintf("%s.ec%02d", baseName, i)
  324. if info, err := os.Stat(shardFile); err == nil {
  325. shardKey := fmt.Sprintf("ec%02d", i)
  326. shardFiles[shardKey] = shardFile
  327. generatedShards = append(generatedShards, shardKey)
  328. totalShardSize += info.Size()
  329. // Log individual shard details
  330. t.GetLogger().WithFields(map[string]interface{}{
  331. "shard_id": i,
  332. "shard_type": shardKey,
  333. "file_path": shardFile,
  334. "size_bytes": info.Size(),
  335. "size_kb": float64(info.Size()) / 1024,
  336. }).Info("EC shard generated")
  337. }
  338. }
  339. // Add metadata files
  340. ecxFile := baseName + ".ecx"
  341. if info, err := os.Stat(ecxFile); err == nil {
  342. shardFiles["ecx"] = ecxFile
  343. t.GetLogger().WithFields(map[string]interface{}{
  344. "file_type": "ecx",
  345. "file_path": ecxFile,
  346. "size_bytes": info.Size(),
  347. }).Info("EC index file generated")
  348. }
  349. // Generate .vif file (volume info)
  350. vifFile := baseName + ".vif"
  351. volumeInfo := &volume_server_pb.VolumeInfo{
  352. Version: uint32(needle.GetCurrentVersion()),
  353. }
  354. if err := volume_info.SaveVolumeInfo(vifFile, volumeInfo); err != nil {
  355. glog.Warningf("Failed to create .vif file: %v", err)
  356. } else {
  357. shardFiles["vif"] = vifFile
  358. if info, err := os.Stat(vifFile); err == nil {
  359. t.GetLogger().WithFields(map[string]interface{}{
  360. "file_type": "vif",
  361. "file_path": vifFile,
  362. "size_bytes": info.Size(),
  363. }).Info("Volume info file generated")
  364. }
  365. }
  366. // Log summary of generation
  367. t.GetLogger().WithFields(map[string]interface{}{
  368. "total_files": len(shardFiles),
  369. "ec_shards": len(generatedShards),
  370. "generated_shards": generatedShards,
  371. "total_shard_size_mb": float64(totalShardSize) / (1024 * 1024),
  372. }).Info("EC shard generation completed")
  373. return shardFiles, nil
  374. }
  375. // distributeEcShards distributes locally generated EC shards to destination servers
  376. // using pre-assigned shard IDs from planning phase
  377. func (t *ErasureCodingTask) distributeEcShards(shardFiles map[string]string) error {
  378. if len(t.targets) == 0 {
  379. return fmt.Errorf("no targets specified for EC shard distribution")
  380. }
  381. if len(shardFiles) == 0 {
  382. return fmt.Errorf("no shard files available for distribution")
  383. }
  384. // Build shard assignment from pre-assigned target shard IDs (from planning phase)
  385. shardAssignment := make(map[string][]string)
  386. for _, target := range t.targets {
  387. if len(target.ShardIds) == 0 {
  388. continue // Skip targets with no assigned shards
  389. }
  390. var assignedShards []string
  391. // Convert shard IDs to shard file names (e.g., 0 → "ec00", 1 → "ec01")
  392. for _, shardId := range target.ShardIds {
  393. shardType := fmt.Sprintf("ec%02d", shardId)
  394. assignedShards = append(assignedShards, shardType)
  395. }
  396. // Add metadata files (.ecx, .vif) to targets that have shards
  397. if len(assignedShards) > 0 {
  398. if _, hasEcx := shardFiles["ecx"]; hasEcx {
  399. assignedShards = append(assignedShards, "ecx")
  400. }
  401. if _, hasVif := shardFiles["vif"]; hasVif {
  402. assignedShards = append(assignedShards, "vif")
  403. }
  404. }
  405. shardAssignment[target.Node] = assignedShards
  406. }
  407. if len(shardAssignment) == 0 {
  408. return fmt.Errorf("no shard assignments found from planning phase")
  409. }
  410. // Store assignment for use during mounting
  411. t.shardAssignment = shardAssignment
  412. // Send assigned shards to each destination
  413. for destNode, assignedShards := range shardAssignment {
  414. t.GetLogger().WithFields(map[string]interface{}{
  415. "destination": destNode,
  416. "assigned_shards": len(assignedShards),
  417. "shard_types": assignedShards,
  418. }).Info("Starting shard distribution to destination server")
  419. // Send only the assigned shards to this destination
  420. var transferredBytes int64
  421. for _, shardType := range assignedShards {
  422. filePath, exists := shardFiles[shardType]
  423. if !exists {
  424. return fmt.Errorf("shard file %s not found for destination %s", shardType, destNode)
  425. }
  426. // Log file size before transfer
  427. if info, err := os.Stat(filePath); err == nil {
  428. transferredBytes += info.Size()
  429. t.GetLogger().WithFields(map[string]interface{}{
  430. "destination": destNode,
  431. "shard_type": shardType,
  432. "file_path": filePath,
  433. "size_bytes": info.Size(),
  434. "size_kb": float64(info.Size()) / 1024,
  435. }).Info("Starting shard file transfer")
  436. }
  437. if err := t.sendShardFileToDestination(destNode, filePath, shardType); err != nil {
  438. return fmt.Errorf("failed to send %s to %s: %v", shardType, destNode, err)
  439. }
  440. t.GetLogger().WithFields(map[string]interface{}{
  441. "destination": destNode,
  442. "shard_type": shardType,
  443. }).Info("Shard file transfer completed")
  444. }
  445. // Log summary for this destination
  446. t.GetLogger().WithFields(map[string]interface{}{
  447. "destination": destNode,
  448. "shards_transferred": len(assignedShards),
  449. "total_bytes": transferredBytes,
  450. "total_mb": float64(transferredBytes) / (1024 * 1024),
  451. }).Info("All shards distributed to destination server")
  452. }
  453. glog.V(1).Infof("Successfully distributed EC shards to %d destinations", len(shardAssignment))
  454. return nil
  455. }
  456. // sendShardFileToDestination sends a single shard file to a destination server using ReceiveFile API
  457. func (t *ErasureCodingTask) sendShardFileToDestination(destServer, filePath, shardType string) error {
  458. return operation.WithVolumeServerClient(false, pb.ServerAddress(destServer), grpc.WithInsecure(),
  459. func(client volume_server_pb.VolumeServerClient) error {
  460. // Open the local shard file
  461. file, err := os.Open(filePath)
  462. if err != nil {
  463. return fmt.Errorf("failed to open shard file %s: %v", filePath, err)
  464. }
  465. defer file.Close()
  466. // Get file size
  467. fileInfo, err := file.Stat()
  468. if err != nil {
  469. return fmt.Errorf("failed to get file info for %s: %v", filePath, err)
  470. }
  471. // Determine file extension and shard ID
  472. var ext string
  473. var shardId uint32
  474. if shardType == "ecx" {
  475. ext = ".ecx"
  476. shardId = 0 // ecx file doesn't have a specific shard ID
  477. } else if shardType == "vif" {
  478. ext = ".vif"
  479. shardId = 0 // vif file doesn't have a specific shard ID
  480. } else if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
  481. // EC shard file like "ec00", "ec01", etc.
  482. ext = "." + shardType
  483. fmt.Sscanf(shardType[2:], "%d", &shardId)
  484. } else {
  485. return fmt.Errorf("unknown shard type: %s", shardType)
  486. }
  487. // Create streaming client
  488. stream, err := client.ReceiveFile(context.Background())
  489. if err != nil {
  490. return fmt.Errorf("failed to create receive stream: %v", err)
  491. }
  492. // Send file info first
  493. err = stream.Send(&volume_server_pb.ReceiveFileRequest{
  494. Data: &volume_server_pb.ReceiveFileRequest_Info{
  495. Info: &volume_server_pb.ReceiveFileInfo{
  496. VolumeId: t.volumeID,
  497. Ext: ext,
  498. Collection: t.collection,
  499. IsEcVolume: true,
  500. ShardId: shardId,
  501. FileSize: uint64(fileInfo.Size()),
  502. },
  503. },
  504. })
  505. if err != nil {
  506. return fmt.Errorf("failed to send file info: %v", err)
  507. }
  508. // Send file content in chunks
  509. buffer := make([]byte, 64*1024) // 64KB chunks
  510. for {
  511. n, readErr := file.Read(buffer)
  512. if n > 0 {
  513. err = stream.Send(&volume_server_pb.ReceiveFileRequest{
  514. Data: &volume_server_pb.ReceiveFileRequest_FileContent{
  515. FileContent: buffer[:n],
  516. },
  517. })
  518. if err != nil {
  519. return fmt.Errorf("failed to send file content: %v", err)
  520. }
  521. }
  522. if readErr == io.EOF {
  523. break
  524. }
  525. if readErr != nil {
  526. return fmt.Errorf("failed to read file: %v", readErr)
  527. }
  528. }
  529. // Close stream and get response
  530. resp, err := stream.CloseAndRecv()
  531. if err != nil {
  532. return fmt.Errorf("failed to close stream: %v", err)
  533. }
  534. if resp.Error != "" {
  535. return fmt.Errorf("server error: %s", resp.Error)
  536. }
  537. glog.V(2).Infof("Successfully sent %s (%d bytes) to %s", shardType, resp.BytesWritten, destServer)
  538. return nil
  539. })
  540. }
  541. // mountEcShards mounts EC shards on destination servers
  542. func (t *ErasureCodingTask) mountEcShards() error {
  543. if t.shardAssignment == nil {
  544. return fmt.Errorf("shard assignment not available for mounting")
  545. }
  546. // Mount only assigned shards on each destination
  547. for destNode, assignedShards := range t.shardAssignment {
  548. // Convert shard names to shard IDs for mounting
  549. var shardIds []uint32
  550. var metadataFiles []string
  551. for _, shardType := range assignedShards {
  552. // Skip metadata files (.ecx, .vif) - only mount EC shards
  553. if strings.HasPrefix(shardType, "ec") && len(shardType) == 4 {
  554. // Parse shard ID from "ec00", "ec01", etc.
  555. var shardId uint32
  556. if _, err := fmt.Sscanf(shardType[2:], "%d", &shardId); err == nil {
  557. shardIds = append(shardIds, shardId)
  558. }
  559. } else {
  560. metadataFiles = append(metadataFiles, shardType)
  561. }
  562. }
  563. t.GetLogger().WithFields(map[string]interface{}{
  564. "destination": destNode,
  565. "shard_ids": shardIds,
  566. "shard_count": len(shardIds),
  567. "metadata_files": metadataFiles,
  568. }).Info("Starting EC shard mount operation")
  569. if len(shardIds) == 0 {
  570. t.GetLogger().WithFields(map[string]interface{}{
  571. "destination": destNode,
  572. "metadata_files": metadataFiles,
  573. }).Info("No EC shards to mount (only metadata files)")
  574. continue
  575. }
  576. err := operation.WithVolumeServerClient(false, pb.ServerAddress(destNode), grpc.WithInsecure(),
  577. func(client volume_server_pb.VolumeServerClient) error {
  578. _, mountErr := client.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  579. VolumeId: t.volumeID,
  580. Collection: t.collection,
  581. ShardIds: shardIds,
  582. })
  583. return mountErr
  584. })
  585. if err != nil {
  586. t.GetLogger().WithFields(map[string]interface{}{
  587. "destination": destNode,
  588. "shard_ids": shardIds,
  589. "error": err.Error(),
  590. }).Error("Failed to mount EC shards")
  591. } else {
  592. t.GetLogger().WithFields(map[string]interface{}{
  593. "destination": destNode,
  594. "shard_ids": shardIds,
  595. "volume_id": t.volumeID,
  596. "collection": t.collection,
  597. }).Info("Successfully mounted EC shards")
  598. }
  599. }
  600. return nil
  601. }
  602. // deleteOriginalVolume deletes the original volume and all its replicas from all servers
  603. func (t *ErasureCodingTask) deleteOriginalVolume() error {
  604. // Get replicas from task parameters (set during detection)
  605. replicas := t.getReplicas()
  606. if len(replicas) == 0 {
  607. glog.Warningf("No replicas found for volume %d, falling back to source server only", t.volumeID)
  608. replicas = []string{t.server}
  609. }
  610. t.GetLogger().WithFields(map[string]interface{}{
  611. "volume_id": t.volumeID,
  612. "replica_count": len(replicas),
  613. "replica_servers": replicas,
  614. }).Info("Starting original volume deletion from replica servers")
  615. // Delete volume from all replica locations
  616. var deleteErrors []string
  617. successCount := 0
  618. for i, replicaServer := range replicas {
  619. t.GetLogger().WithFields(map[string]interface{}{
  620. "replica_index": i + 1,
  621. "total_replicas": len(replicas),
  622. "server": replicaServer,
  623. "volume_id": t.volumeID,
  624. }).Info("Deleting volume from replica server")
  625. err := operation.WithVolumeServerClient(false, pb.ServerAddress(replicaServer), grpc.WithInsecure(),
  626. func(client volume_server_pb.VolumeServerClient) error {
  627. _, err := client.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
  628. VolumeId: t.volumeID,
  629. OnlyEmpty: false, // Force delete since we've created EC shards
  630. })
  631. return err
  632. })
  633. if err != nil {
  634. deleteErrors = append(deleteErrors, fmt.Sprintf("failed to delete volume %d from %s: %v", t.volumeID, replicaServer, err))
  635. t.GetLogger().WithFields(map[string]interface{}{
  636. "server": replicaServer,
  637. "volume_id": t.volumeID,
  638. "error": err.Error(),
  639. }).Error("Failed to delete volume from replica server")
  640. } else {
  641. successCount++
  642. t.GetLogger().WithFields(map[string]interface{}{
  643. "server": replicaServer,
  644. "volume_id": t.volumeID,
  645. }).Info("Successfully deleted volume from replica server")
  646. }
  647. }
  648. // Report results
  649. if len(deleteErrors) > 0 {
  650. t.GetLogger().WithFields(map[string]interface{}{
  651. "volume_id": t.volumeID,
  652. "successful": successCount,
  653. "failed": len(deleteErrors),
  654. "total_replicas": len(replicas),
  655. "success_rate": float64(successCount) / float64(len(replicas)) * 100,
  656. "errors": deleteErrors,
  657. }).Warning("Some volume deletions failed")
  658. // Don't return error - EC task should still be considered successful if shards are mounted
  659. } else {
  660. t.GetLogger().WithFields(map[string]interface{}{
  661. "volume_id": t.volumeID,
  662. "replica_count": len(replicas),
  663. "replica_servers": replicas,
  664. }).Info("Successfully deleted volume from all replica servers")
  665. }
  666. return nil
  667. }
  668. // getReplicas extracts replica servers from unified sources
  669. func (t *ErasureCodingTask) getReplicas() []string {
  670. var replicas []string
  671. for _, source := range t.sources {
  672. // Only include volume replica sources (not EC shard sources)
  673. // Assumption: VolumeId == 0 is considered invalid and should be excluded.
  674. // If volume ID 0 is valid in some contexts, update this check accordingly.
  675. if source.VolumeId > 0 {
  676. replicas = append(replicas, source.Node)
  677. }
  678. }
  679. return replicas
  680. }