filer_server_handlers_copy.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. package weed_server
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "strings"
  9. "time"
  10. "golang.org/x/sync/errgroup"
  11. "google.golang.org/protobuf/proto"
  12. "github.com/seaweedfs/seaweedfs/weed/filer"
  13. "github.com/seaweedfs/seaweedfs/weed/glog"
  14. "github.com/seaweedfs/seaweedfs/weed/operation"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/util"
  17. )
  18. func (fs *FilerServer) copy(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) {
  19. src := r.URL.Query().Get("cp.from")
  20. dst := r.URL.Path
  21. glog.V(2).InfofCtx(ctx, "FilerServer.copy %v to %v", src, dst)
  22. var err error
  23. if src, err = clearName(src); err != nil {
  24. writeJsonError(w, r, http.StatusBadRequest, err)
  25. return
  26. }
  27. if dst, err = clearName(dst); err != nil {
  28. writeJsonError(w, r, http.StatusBadRequest, err)
  29. return
  30. }
  31. src = strings.TrimRight(src, "/")
  32. if src == "" {
  33. err = fmt.Errorf("invalid source '/'")
  34. writeJsonError(w, r, http.StatusBadRequest, err)
  35. return
  36. }
  37. srcPath := util.FullPath(src)
  38. dstPath := util.FullPath(dst)
  39. if dstPath.IsLongerFileName(so.MaxFileNameLength) {
  40. err = fmt.Errorf("dst name too long")
  41. writeJsonError(w, r, http.StatusBadRequest, err)
  42. return
  43. }
  44. srcEntry, err := fs.filer.FindEntry(ctx, srcPath)
  45. if err != nil {
  46. err = fmt.Errorf("failed to get src entry '%s': %w", src, err)
  47. writeJsonError(w, r, http.StatusBadRequest, err)
  48. return
  49. }
  50. glog.V(1).InfofCtx(ctx, "FilerServer.copy source entry: content_len=%d, chunks_len=%d", len(srcEntry.Content), len(srcEntry.GetChunks()))
  51. // Check if source is a directory - currently not supported for recursive copying
  52. if srcEntry.IsDirectory() {
  53. err = fmt.Errorf("copy: directory copying not yet supported for '%s'", src)
  54. writeJsonError(w, r, http.StatusBadRequest, err)
  55. return
  56. }
  57. _, oldName := srcPath.DirAndName()
  58. finalDstPath := dstPath
  59. // Check if destination is a directory
  60. dstPathEntry, findErr := fs.filer.FindEntry(ctx, dstPath)
  61. if findErr != nil && findErr != filer_pb.ErrNotFound {
  62. err = fmt.Errorf("failed to check destination path %s: %w", dstPath, findErr)
  63. writeJsonError(w, r, http.StatusInternalServerError, err)
  64. return
  65. }
  66. if findErr == nil && dstPathEntry.IsDirectory() {
  67. finalDstPath = dstPath.Child(oldName)
  68. } else {
  69. newDir, newName := dstPath.DirAndName()
  70. newName = util.Nvl(newName, oldName)
  71. finalDstPath = util.FullPath(newDir).Child(newName)
  72. }
  73. // Check if destination file already exists
  74. // TODO: add an overwrite parameter to allow overwriting
  75. if dstEntry, err := fs.filer.FindEntry(ctx, finalDstPath); err != nil && err != filer_pb.ErrNotFound {
  76. err = fmt.Errorf("failed to check destination entry %s: %w", finalDstPath, err)
  77. writeJsonError(w, r, http.StatusInternalServerError, err)
  78. return
  79. } else if dstEntry != nil {
  80. err = fmt.Errorf("destination file %s already exists", finalDstPath)
  81. writeJsonError(w, r, http.StatusConflict, err)
  82. return
  83. }
  84. // Copy the file content and chunks
  85. newEntry, err := fs.copyEntry(ctx, srcEntry, finalDstPath, so)
  86. if err != nil {
  87. err = fmt.Errorf("failed to copy entry from '%s' to '%s': %w", src, dst, err)
  88. writeJsonError(w, r, http.StatusInternalServerError, err)
  89. return
  90. }
  91. if createErr := fs.filer.CreateEntry(ctx, newEntry, true, false, nil, false, fs.filer.MaxFilenameLength); createErr != nil {
  92. err = fmt.Errorf("failed to create copied entry from '%s' to '%s': %w", src, dst, createErr)
  93. writeJsonError(w, r, http.StatusInternalServerError, err)
  94. return
  95. }
  96. glog.V(1).InfofCtx(ctx, "FilerServer.copy completed successfully: src='%s' -> dst='%s' (final_path='%s')", src, dst, finalDstPath)
  97. w.WriteHeader(http.StatusNoContent)
  98. }
  99. // copyEntry creates a new entry with copied content and chunks
  100. func (fs *FilerServer) copyEntry(ctx context.Context, srcEntry *filer.Entry, dstPath util.FullPath, so *operation.StorageOption) (*filer.Entry, error) {
  101. // Create the base entry structure
  102. // Note: For hard links, we copy the actual content but NOT the HardLinkId/HardLinkCounter
  103. // This creates an independent copy rather than another hard link to the same content
  104. newEntry := &filer.Entry{
  105. FullPath: dstPath,
  106. // Deep copy Attr field to ensure slice independence (GroupNames, Md5)
  107. Attr: func(a filer.Attr) filer.Attr {
  108. a.GroupNames = append([]string(nil), a.GroupNames...)
  109. a.Md5 = append([]byte(nil), a.Md5...)
  110. return a
  111. }(srcEntry.Attr),
  112. Quota: srcEntry.Quota,
  113. // Intentionally NOT copying HardLinkId and HardLinkCounter to create independent copy
  114. }
  115. // Deep copy Extended fields to ensure independence
  116. if srcEntry.Extended != nil {
  117. newEntry.Extended = make(map[string][]byte, len(srcEntry.Extended))
  118. for k, v := range srcEntry.Extended {
  119. newEntry.Extended[k] = append([]byte(nil), v...)
  120. }
  121. }
  122. // Deep copy Remote field to ensure independence
  123. if srcEntry.Remote != nil {
  124. newEntry.Remote = &filer_pb.RemoteEntry{
  125. StorageName: srcEntry.Remote.StorageName,
  126. LastLocalSyncTsNs: srcEntry.Remote.LastLocalSyncTsNs,
  127. RemoteETag: srcEntry.Remote.RemoteETag,
  128. RemoteMtime: srcEntry.Remote.RemoteMtime,
  129. RemoteSize: srcEntry.Remote.RemoteSize,
  130. }
  131. }
  132. // Log if we're copying a hard link so we can track this behavior
  133. if len(srcEntry.HardLinkId) > 0 {
  134. glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copying hard link %s (nlink=%d) as independent file", srcEntry.FullPath, srcEntry.HardLinkCounter)
  135. }
  136. // Handle small files stored in Content field
  137. if len(srcEntry.Content) > 0 {
  138. // For small files, just copy the content directly
  139. newEntry.Content = make([]byte, len(srcEntry.Content))
  140. copy(newEntry.Content, srcEntry.Content)
  141. glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied content directly, size=%d", len(newEntry.Content))
  142. return newEntry, nil
  143. }
  144. // Handle files stored as chunks (including resolved hard link content)
  145. if len(srcEntry.GetChunks()) > 0 {
  146. srcChunks := srcEntry.GetChunks()
  147. // Create HTTP client once for reuse across all chunk operations
  148. client := &http.Client{Timeout: 60 * time.Second}
  149. // Check if any chunks are manifest chunks - these require special handling
  150. if filer.HasChunkManifest(srcChunks) {
  151. glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: handling manifest chunks")
  152. newChunks, err := fs.copyChunksWithManifest(ctx, srcChunks, so, client)
  153. if err != nil {
  154. return nil, fmt.Errorf("failed to copy chunks with manifest: %w", err)
  155. }
  156. newEntry.Chunks = newChunks
  157. glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied manifest chunks, count=%d", len(newChunks))
  158. } else {
  159. // Regular chunks without manifest - copy directly
  160. newChunks, err := fs.copyChunks(ctx, srcChunks, so, client)
  161. if err != nil {
  162. return nil, fmt.Errorf("failed to copy chunks: %w", err)
  163. }
  164. newEntry.Chunks = newChunks
  165. glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: copied regular chunks, count=%d", len(newChunks))
  166. }
  167. return newEntry, nil
  168. }
  169. // Empty file case (or hard link with no content - should not happen if hard link was properly resolved)
  170. if len(srcEntry.HardLinkId) > 0 {
  171. glog.WarningfCtx(ctx, "FilerServer.copyEntry: hard link %s appears to have no content - this may indicate an issue with hard link resolution", srcEntry.FullPath)
  172. }
  173. glog.V(2).InfofCtx(ctx, "FilerServer.copyEntry: empty file, no content or chunks to copy")
  174. return newEntry, nil
  175. }
  176. // copyChunks creates new chunks by copying data from source chunks using parallel streaming approach
  177. func (fs *FilerServer) copyChunks(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) ([]*filer_pb.FileChunk, error) {
  178. if len(srcChunks) == 0 {
  179. return nil, nil
  180. }
  181. // Optimize: Batch volume lookup for all chunks to reduce RPC calls
  182. volumeLocationsMap, err := fs.batchLookupVolumeLocations(ctx, srcChunks)
  183. if err != nil {
  184. return nil, fmt.Errorf("failed to lookup volume locations: %w", err)
  185. }
  186. // Parallel chunk copying with concurrency control using errgroup
  187. const maxConcurrentChunks = 8 // Match SeaweedFS standard for parallel operations
  188. // Pre-allocate result slice to maintain order
  189. newChunks := make([]*filer_pb.FileChunk, len(srcChunks))
  190. // Use errgroup for cleaner concurrency management
  191. g, gCtx := errgroup.WithContext(ctx)
  192. g.SetLimit(maxConcurrentChunks) // Limit concurrent goroutines
  193. // Validate that all chunk locations are available before starting any concurrent work
  194. for _, chunk := range srcChunks {
  195. volumeId := chunk.Fid.VolumeId
  196. locations, ok := volumeLocationsMap[volumeId]
  197. if !ok || len(locations) == 0 {
  198. return nil, fmt.Errorf("no locations found for volume %d", volumeId)
  199. }
  200. }
  201. glog.V(2).InfofCtx(ctx, "FilerServer.copyChunks: starting parallel copy of %d chunks with max concurrency %d", len(srcChunks), maxConcurrentChunks)
  202. // Launch goroutines for each chunk
  203. for i, srcChunk := range srcChunks {
  204. // Capture loop variables for goroutine closure
  205. chunkIndex := i
  206. chunk := srcChunk
  207. chunkLocations := volumeLocationsMap[srcChunk.Fid.VolumeId]
  208. g.Go(func() error {
  209. glog.V(3).InfofCtx(gCtx, "FilerServer.copyChunks: copying chunk %d/%d, size=%d", chunkIndex+1, len(srcChunks), chunk.Size)
  210. // Use streaming copy to avoid loading entire chunk into memory
  211. newChunk, err := fs.streamCopyChunk(gCtx, chunk, so, client, chunkLocations)
  212. if err != nil {
  213. return fmt.Errorf("failed to copy chunk %d (%s): %w", chunkIndex+1, chunk.GetFileIdString(), err)
  214. }
  215. // Store result at correct index to maintain order
  216. newChunks[chunkIndex] = newChunk
  217. glog.V(4).InfofCtx(gCtx, "FilerServer.copyChunks: successfully copied chunk %d/%d", chunkIndex+1, len(srcChunks))
  218. return nil
  219. })
  220. }
  221. // Wait for all chunks to complete and return first error (if any)
  222. if err := g.Wait(); err != nil {
  223. return nil, err
  224. }
  225. // Verify all chunks were copied (shouldn't happen if no errors, but safety check)
  226. for i, chunk := range newChunks {
  227. if chunk == nil {
  228. return nil, fmt.Errorf("chunk %d was not copied (internal error)", i)
  229. }
  230. }
  231. glog.V(2).InfofCtx(ctx, "FilerServer.copyChunks: successfully completed parallel copy of %d chunks", len(srcChunks))
  232. return newChunks, nil
  233. }
  234. // copyChunksWithManifest handles copying chunks that include manifest chunks
  235. func (fs *FilerServer) copyChunksWithManifest(ctx context.Context, srcChunks []*filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) ([]*filer_pb.FileChunk, error) {
  236. if len(srcChunks) == 0 {
  237. return nil, nil
  238. }
  239. glog.V(2).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: processing %d chunks (some are manifests)", len(srcChunks))
  240. // Separate manifest chunks from regular data chunks
  241. manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(srcChunks)
  242. var newChunks []*filer_pb.FileChunk
  243. // First, copy all non-manifest chunks directly
  244. if len(nonManifestChunks) > 0 {
  245. glog.V(3).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: copying %d non-manifest chunks", len(nonManifestChunks))
  246. newNonManifestChunks, err := fs.copyChunks(ctx, nonManifestChunks, so, client)
  247. if err != nil {
  248. return nil, fmt.Errorf("failed to copy non-manifest chunks: %w", err)
  249. }
  250. newChunks = append(newChunks, newNonManifestChunks...)
  251. }
  252. // Process each manifest chunk separately
  253. for i, manifestChunk := range manifestChunks {
  254. glog.V(3).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: processing manifest chunk %d/%d", i+1, len(manifestChunks))
  255. // Resolve the manifest chunk to get the actual data chunks it references
  256. lookupFileIdFn := func(ctx context.Context, fileId string) (urls []string, err error) {
  257. return fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId)
  258. }
  259. resolvedChunks, err := filer.ResolveOneChunkManifest(ctx, lookupFileIdFn, manifestChunk)
  260. if err != nil {
  261. return nil, fmt.Errorf("failed to resolve manifest chunk %s: %w", manifestChunk.GetFileIdString(), err)
  262. }
  263. glog.V(4).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: resolved manifest chunk %s to %d data chunks",
  264. manifestChunk.GetFileIdString(), len(resolvedChunks))
  265. // Copy all the resolved data chunks (use recursive copyChunksWithManifest to handle nested manifests)
  266. newResolvedChunks, err := fs.copyChunksWithManifest(ctx, resolvedChunks, so, client)
  267. if err != nil {
  268. return nil, fmt.Errorf("failed to copy resolved chunks from manifest %s: %w", manifestChunk.GetFileIdString(), err)
  269. }
  270. // Create a new manifest chunk that references the copied data chunks
  271. newManifestChunk, err := fs.createManifestChunk(ctx, newResolvedChunks, manifestChunk, so, client)
  272. if err != nil {
  273. return nil, fmt.Errorf("failed to create new manifest chunk: %w", err)
  274. }
  275. newChunks = append(newChunks, newManifestChunk)
  276. glog.V(4).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: created new manifest chunk %s for %d resolved chunks",
  277. newManifestChunk.GetFileIdString(), len(newResolvedChunks))
  278. }
  279. glog.V(2).InfofCtx(ctx, "FilerServer.copyChunksWithManifest: completed copying %d total chunks (%d manifest, %d regular)",
  280. len(newChunks), len(manifestChunks), len(nonManifestChunks))
  281. return newChunks, nil
  282. }
  283. // createManifestChunk creates a new manifest chunk that references the provided data chunks
  284. func (fs *FilerServer) createManifestChunk(ctx context.Context, dataChunks []*filer_pb.FileChunk, originalManifest *filer_pb.FileChunk, so *operation.StorageOption, client *http.Client) (*filer_pb.FileChunk, error) {
  285. // Create the manifest data structure
  286. filer_pb.BeforeEntrySerialization(dataChunks)
  287. manifestData := &filer_pb.FileChunkManifest{
  288. Chunks: dataChunks,
  289. }
  290. // Serialize the manifest
  291. data, err := proto.Marshal(manifestData)
  292. if err != nil {
  293. return nil, fmt.Errorf("failed to marshal manifest: %w", err)
  294. }
  295. // Save the manifest data as a new chunk
  296. saveFunc := func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
  297. // Assign a new file ID
  298. fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so)
  299. if assignErr != nil {
  300. return nil, fmt.Errorf("failed to assign file ID for manifest: %w", assignErr)
  301. }
  302. // Upload the manifest data
  303. err = fs.uploadData(ctx, reader, urlLocation, string(auth), client)
  304. if err != nil {
  305. return nil, fmt.Errorf("failed to upload manifest data: %w", err)
  306. }
  307. // Create the chunk metadata
  308. chunk = &filer_pb.FileChunk{
  309. FileId: fileId,
  310. Offset: offset,
  311. Size: uint64(len(data)),
  312. }
  313. return chunk, nil
  314. }
  315. manifestChunk, err := saveFunc(bytes.NewReader(data), "", originalManifest.Offset, 0)
  316. if err != nil {
  317. return nil, fmt.Errorf("failed to save manifest chunk: %w", err)
  318. }
  319. // Set manifest-specific properties
  320. manifestChunk.IsChunkManifest = true
  321. manifestChunk.Size = originalManifest.Size
  322. return manifestChunk, nil
  323. }
  324. // uploadData uploads data to a volume server
  325. func (fs *FilerServer) uploadData(ctx context.Context, reader io.Reader, urlLocation, auth string, client *http.Client) error {
  326. req, err := http.NewRequestWithContext(ctx, "PUT", urlLocation, reader)
  327. if err != nil {
  328. return fmt.Errorf("failed to create upload request: %w", err)
  329. }
  330. if auth != "" {
  331. req.Header.Set("Authorization", "Bearer "+auth)
  332. }
  333. resp, err := client.Do(req)
  334. if err != nil {
  335. return fmt.Errorf("failed to upload data: %w", err)
  336. }
  337. defer resp.Body.Close()
  338. if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
  339. body, readErr := io.ReadAll(resp.Body)
  340. if readErr != nil {
  341. return fmt.Errorf("upload failed with status %d, and failed to read response: %w", resp.StatusCode, readErr)
  342. }
  343. return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(body))
  344. }
  345. return nil
  346. }
  347. // batchLookupVolumeLocations performs a single batched lookup for all unique volume IDs in the chunks
  348. func (fs *FilerServer) batchLookupVolumeLocations(ctx context.Context, chunks []*filer_pb.FileChunk) (map[uint32][]operation.Location, error) {
  349. // Collect unique volume IDs and their string representations to avoid repeated conversions
  350. volumeIdMap := make(map[uint32]string)
  351. for _, chunk := range chunks {
  352. vid := chunk.Fid.VolumeId
  353. if _, found := volumeIdMap[vid]; !found {
  354. volumeIdMap[vid] = fmt.Sprintf("%d", vid)
  355. }
  356. }
  357. if len(volumeIdMap) == 0 {
  358. return make(map[uint32][]operation.Location), nil
  359. }
  360. // Convert to slice of strings for the lookup call
  361. volumeIdStrs := make([]string, 0, len(volumeIdMap))
  362. for _, vidStr := range volumeIdMap {
  363. volumeIdStrs = append(volumeIdStrs, vidStr)
  364. }
  365. // Perform single batched lookup
  366. lookupResult, err := operation.LookupVolumeIds(fs.filer.GetMaster, fs.grpcDialOption, volumeIdStrs)
  367. if err != nil {
  368. return nil, fmt.Errorf("failed to lookup volumes: %w", err)
  369. }
  370. // Convert result to map of volumeId -> locations
  371. volumeLocationsMap := make(map[uint32][]operation.Location)
  372. for volumeId, volumeIdStr := range volumeIdMap {
  373. if volumeLocations, ok := lookupResult[volumeIdStr]; ok && len(volumeLocations.Locations) > 0 {
  374. volumeLocationsMap[volumeId] = volumeLocations.Locations
  375. }
  376. }
  377. return volumeLocationsMap, nil
  378. }
  379. // streamCopyChunk copies a chunk using streaming to minimize memory usage
  380. func (fs *FilerServer) streamCopyChunk(ctx context.Context, srcChunk *filer_pb.FileChunk, so *operation.StorageOption, client *http.Client, locations []operation.Location) (*filer_pb.FileChunk, error) {
  381. // Assign a new file ID for destination
  382. fileId, urlLocation, auth, err := fs.assignNewFileInfo(ctx, so)
  383. if err != nil {
  384. return nil, fmt.Errorf("failed to assign new file ID: %w", err)
  385. }
  386. // Try all available locations for source chunk until one succeeds
  387. fileIdString := srcChunk.GetFileIdString()
  388. var lastErr error
  389. for i, location := range locations {
  390. srcUrl := fmt.Sprintf("http://%s/%s", location.Url, fileIdString)
  391. glog.V(4).InfofCtx(ctx, "FilerServer.streamCopyChunk: attempting streaming copy from %s to %s (attempt %d/%d)", srcUrl, urlLocation, i+1, len(locations))
  392. // Perform streaming copy using HTTP client
  393. err := fs.performStreamCopy(ctx, srcUrl, urlLocation, string(auth), srcChunk.Size, client)
  394. if err != nil {
  395. lastErr = err
  396. glog.V(2).InfofCtx(ctx, "FilerServer.streamCopyChunk: failed streaming copy from %s: %v", srcUrl, err)
  397. continue
  398. }
  399. // Success - create chunk metadata
  400. newChunk := &filer_pb.FileChunk{
  401. FileId: fileId,
  402. Offset: srcChunk.Offset,
  403. Size: srcChunk.Size,
  404. ETag: srcChunk.ETag,
  405. }
  406. glog.V(4).InfofCtx(ctx, "FilerServer.streamCopyChunk: successfully streamed %d bytes", srcChunk.Size)
  407. return newChunk, nil
  408. }
  409. // All locations failed
  410. return nil, fmt.Errorf("failed to stream copy chunk from any location: %w", lastErr)
  411. }
  412. // performStreamCopy performs the actual streaming copy from source URL to destination URL
  413. func (fs *FilerServer) performStreamCopy(ctx context.Context, srcUrl, dstUrl, auth string, expectedSize uint64, client *http.Client) error {
  414. // Create HTTP request to read from source
  415. req, err := http.NewRequestWithContext(ctx, "GET", srcUrl, nil)
  416. if err != nil {
  417. return fmt.Errorf("failed to create source request: %v", err)
  418. }
  419. // Perform source request
  420. resp, err := client.Do(req)
  421. if err != nil {
  422. return fmt.Errorf("failed to read from source: %v", err)
  423. }
  424. defer resp.Body.Close()
  425. if resp.StatusCode != http.StatusOK {
  426. return fmt.Errorf("source returned status %d", resp.StatusCode)
  427. }
  428. // Create HTTP request to write to destination
  429. dstReq, err := http.NewRequestWithContext(ctx, "PUT", dstUrl, resp.Body)
  430. if err != nil {
  431. return fmt.Errorf("failed to create destination request: %v", err)
  432. }
  433. dstReq.ContentLength = int64(expectedSize)
  434. // Set authorization header if provided
  435. if auth != "" {
  436. dstReq.Header.Set("Authorization", "Bearer "+auth)
  437. }
  438. dstReq.Header.Set("Content-Type", "application/octet-stream")
  439. // Perform destination request
  440. dstResp, err := client.Do(dstReq)
  441. if err != nil {
  442. return fmt.Errorf("failed to write to destination: %v", err)
  443. }
  444. defer dstResp.Body.Close()
  445. if dstResp.StatusCode != http.StatusCreated && dstResp.StatusCode != http.StatusOK {
  446. // Read error response body for more details
  447. body, readErr := io.ReadAll(dstResp.Body)
  448. if readErr != nil {
  449. return fmt.Errorf("destination returned status %d, and failed to read body: %w", dstResp.StatusCode, readErr)
  450. }
  451. return fmt.Errorf("destination returned status %d: %s", dstResp.StatusCode, string(body))
  452. }
  453. glog.V(4).InfofCtx(ctx, "FilerServer.performStreamCopy: successfully streamed data from %s to %s", srcUrl, dstUrl)
  454. return nil
  455. }