replication_util.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. package repl_util
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  7. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  8. )
  9. func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerSource *source.FilerSource, writeFunc func(data []byte) error) error {
  10. for x := chunkViews.Front(); x != nil; x = x.Next {
  11. chunk := x.Value
  12. fileUrls, err := filerSource.LookupFileId(context.Background(), chunk.FileId)
  13. if err != nil {
  14. return err
  15. }
  16. var writeErr error
  17. var shouldRetry bool
  18. for _, fileUrl := range fileUrls {
  19. shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) {
  20. writeErr = writeFunc(data)
  21. })
  22. if err != nil {
  23. glog.V(1).Infof("read from %s: %v", fileUrl, err)
  24. } else if writeErr != nil {
  25. glog.V(1).Infof("copy from %s: %v", fileUrl, writeErr)
  26. } else {
  27. break
  28. }
  29. }
  30. if shouldRetry && err != nil {
  31. return err
  32. }
  33. if writeErr != nil {
  34. return writeErr
  35. }
  36. }
  37. return nil
  38. }