azure_sink.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package azuresink
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "net/http"
  8. "strings"
  9. "time"
  10. "github.com/Azure/azure-sdk-for-go/sdk/azcore"
  11. "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
  12. "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
  13. "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
  14. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
  15. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
  16. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
  17. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
  18. "github.com/seaweedfs/seaweedfs/weed/filer"
  19. "github.com/seaweedfs/seaweedfs/weed/glog"
  20. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  21. "github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
  22. "github.com/seaweedfs/seaweedfs/weed/replication/sink"
  23. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  24. "github.com/seaweedfs/seaweedfs/weed/util"
  25. )
  26. type AzureSink struct {
  27. client *azblob.Client
  28. container string
  29. dir string
  30. filerSource *source.FilerSource
  31. isIncremental bool
  32. }
  33. func init() {
  34. sink.Sinks = append(sink.Sinks, &AzureSink{})
  35. }
  36. func (g *AzureSink) GetName() string {
  37. return "azure"
  38. }
  39. func (g *AzureSink) GetSinkToDirectory() string {
  40. return g.dir
  41. }
  42. func (g *AzureSink) IsIncremental() bool {
  43. return g.isIncremental
  44. }
  45. func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error {
  46. g.isIncremental = configuration.GetBool(prefix + "is_incremental")
  47. return g.initialize(
  48. configuration.GetString(prefix+"account_name"),
  49. configuration.GetString(prefix+"account_key"),
  50. configuration.GetString(prefix+"container"),
  51. configuration.GetString(prefix+"directory"),
  52. )
  53. }
  54. func (g *AzureSink) SetSourceFiler(s *source.FilerSource) {
  55. g.filerSource = s
  56. }
  57. func (g *AzureSink) initialize(accountName, accountKey, container, dir string) error {
  58. g.container = container
  59. g.dir = dir
  60. // Create credential and client
  61. credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
  62. if err != nil {
  63. return fmt.Errorf("failed to create Azure credential with account name:%s: %w", accountName, err)
  64. }
  65. serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName)
  66. client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{
  67. ClientOptions: azcore.ClientOptions{
  68. Retry: policy.RetryOptions{
  69. MaxRetries: 10, // Increased from default 3 for replication sink resiliency
  70. TryTimeout: time.Minute,
  71. RetryDelay: 2 * time.Second,
  72. MaxRetryDelay: time.Minute,
  73. },
  74. },
  75. })
  76. if err != nil {
  77. return fmt.Errorf("failed to create Azure client: %w", err)
  78. }
  79. g.client = client
  80. return nil
  81. }
  82. func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
  83. key = cleanKey(key)
  84. if isDirectory {
  85. key = key + "/"
  86. }
  87. blobClient := g.client.ServiceClient().NewContainerClient(g.container).NewBlobClient(key)
  88. _, err := blobClient.Delete(context.Background(), &blob.DeleteOptions{
  89. DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
  90. })
  91. if err != nil {
  92. // Make delete idempotent - don't return error if blob doesn't exist
  93. if bloberror.HasCode(err, bloberror.BlobNotFound) {
  94. return nil
  95. }
  96. return fmt.Errorf("azure delete %s/%s: %w", g.container, key, err)
  97. }
  98. return nil
  99. }
  100. func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
  101. key = cleanKey(key)
  102. if entry.IsDirectory {
  103. return nil
  104. }
  105. totalSize := filer.FileSize(entry)
  106. chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
  107. // Create append blob client
  108. appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key)
  109. // Create blob with access conditions
  110. accessConditions := &blob.AccessConditions{}
  111. if entry.Attributes != nil && entry.Attributes.Mtime > 0 {
  112. modifiedTime := time.Unix(entry.Attributes.Mtime, 0)
  113. accessConditions.ModifiedAccessConditions = &blob.ModifiedAccessConditions{
  114. IfUnmodifiedSince: &modifiedTime,
  115. }
  116. }
  117. _, err := appendBlobClient.Create(context.Background(), &appendblob.CreateOptions{
  118. AccessConditions: accessConditions,
  119. })
  120. if err != nil {
  121. if bloberror.HasCode(err, bloberror.BlobAlreadyExists) {
  122. // Blob already exists, which is fine for an append blob - we can append to it
  123. } else {
  124. // Check if this is a precondition failed error (HTTP 412)
  125. var respErr *azcore.ResponseError
  126. if ok := errors.As(err, &respErr); ok && respErr.StatusCode == http.StatusPreconditionFailed {
  127. glog.V(0).Infof("skip overwriting %s/%s: precondition failed", g.container, key)
  128. return nil
  129. }
  130. return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err)
  131. }
  132. }
  133. writeFunc := func(data []byte) error {
  134. _, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{})
  135. return writeErr
  136. }
  137. if len(entry.Content) > 0 {
  138. return writeFunc(entry.Content)
  139. }
  140. if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
  141. return err
  142. }
  143. return nil
  144. }
  145. func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
  146. key = cleanKey(key)
  147. return true, g.CreateEntry(key, newEntry, signatures)
  148. }
  149. func cleanKey(key string) string {
  150. if strings.HasPrefix(key, "/") {
  151. key = key[1:]
  152. }
  153. return key
  154. }