filer_remote_gateway_buckets.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/filer"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/remote_storage"
  11. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. "google.golang.org/protobuf/proto"
  14. "math"
  15. "math/rand"
  16. "path/filepath"
  17. "strings"
  18. "time"
  19. )
  20. func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error {
  21. // read filer remote storage mount mappings
  22. if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
  23. return fmt.Errorf("read mount info: %w", detectErr)
  24. }
  25. eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource)
  26. if err != nil {
  27. return err
  28. }
  29. lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
  30. processor := NewMetadataProcessor(eachEntryFunc, 128, lastOffsetTs.UnixNano())
  31. var lastLogTsNs = time.Now().UnixNano()
  32. processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
  33. processor.AddSyncJob(resp)
  34. return nil
  35. }, 3*time.Second, func(counter int64, lastTsNs int64) error {
  36. offsetTsNs := processor.processedTsWatermark.Load()
  37. if offsetTsNs == 0 {
  38. return nil
  39. }
  40. now := time.Now().UnixNano()
  41. glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
  42. lastLogTsNs = now
  43. return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, offsetTsNs)
  44. })
  45. option.clientEpoch++
  46. metadataFollowOption := &pb.MetadataFollowOption{
  47. ClientName: "filer.remote.sync",
  48. ClientId: option.clientId,
  49. ClientEpoch: option.clientEpoch,
  50. SelfSignature: 0,
  51. PathPrefix: option.bucketsDir + "/",
  52. AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote},
  53. DirectoriesToWatch: nil,
  54. StartTsNs: lastOffsetTs.UnixNano(),
  55. StopTsNs: 0,
  56. EventErrorType: pb.RetryForeverOnError,
  57. }
  58. return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset)
  59. }
  60. func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
  61. handleCreateBucket := func(entry *filer_pb.Entry) error {
  62. if !entry.IsDirectory {
  63. return nil
  64. }
  65. if entry.RemoteEntry != nil {
  66. // this directory is imported from "remote.mount.buckets" or "remote.mount"
  67. return nil
  68. }
  69. if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" {
  70. *option.createBucketAt = option.mappings.PrimaryBucketStorageName
  71. glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt)
  72. }
  73. if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" {
  74. for k := range option.mappings.Mappings {
  75. *option.createBucketAt = k
  76. glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt)
  77. }
  78. }
  79. if *option.createBucketAt == "" {
  80. return nil
  81. }
  82. remoteConf, found := option.remoteConfs[*option.createBucketAt]
  83. if !found {
  84. return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt)
  85. }
  86. client, err := remote_storage.GetRemoteStorage(remoteConf)
  87. if err != nil {
  88. return err
  89. }
  90. bucketName := strings.ToLower(entry.Name)
  91. if *option.include != "" {
  92. if ok, _ := filepath.Match(*option.include, entry.Name); !ok {
  93. return nil
  94. }
  95. }
  96. if *option.exclude != "" {
  97. if ok, _ := filepath.Match(*option.exclude, entry.Name); ok {
  98. return nil
  99. }
  100. }
  101. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  102. remoteLocation, found := option.mappings.Mappings[string(bucketPath)]
  103. if !found {
  104. if *option.createBucketRandomSuffix {
  105. // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
  106. if len(bucketName)+5 > 63 {
  107. bucketName = bucketName[:58]
  108. }
  109. bucketName = fmt.Sprintf("%s-%04d", bucketName, rand.Uint32()%10000)
  110. }
  111. remoteLocation = &remote_pb.RemoteStorageLocation{
  112. Name: *option.createBucketAt,
  113. Bucket: bucketName,
  114. Path: "/",
  115. }
  116. // need to add new mapping here before getting updates from metadata tailing
  117. option.mappings.Mappings[string(bucketPath)] = remoteLocation
  118. } else {
  119. bucketName = remoteLocation.Bucket
  120. }
  121. glog.V(0).Infof("create bucket %s", bucketName)
  122. if err := client.CreateBucket(bucketName); err != nil {
  123. return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err)
  124. }
  125. return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
  126. }
  127. handleDeleteBucket := func(entry *filer_pb.Entry) error {
  128. if !entry.IsDirectory {
  129. return nil
  130. }
  131. client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name)
  132. if err != nil {
  133. return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err)
  134. }
  135. glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket)
  136. if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil {
  137. return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err)
  138. }
  139. bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
  140. return filer.DeleteMountMapping(option, string(bucketPath))
  141. }
  142. handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
  143. message := resp.EventNotification
  144. if message.NewEntry != nil {
  145. // update
  146. if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
  147. newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
  148. if readErr != nil {
  149. return fmt.Errorf("unmarshal mappings: %w", readErr)
  150. }
  151. option.mappings = newMappings
  152. }
  153. if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  154. conf := &remote_pb.RemoteConf{}
  155. if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
  156. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
  157. }
  158. option.remoteConfs[conf.Name] = conf
  159. }
  160. } else if message.OldEntry != nil {
  161. // deletion
  162. if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  163. conf := &remote_pb.RemoteConf{}
  164. if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil {
  165. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err)
  166. }
  167. delete(option.remoteConfs, conf.Name)
  168. }
  169. }
  170. return nil
  171. }
  172. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  173. message := resp.EventNotification
  174. if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
  175. return handleEtcRemoteChanges(resp)
  176. }
  177. if filer_pb.IsEmpty(resp) {
  178. return nil
  179. }
  180. if filer_pb.IsCreate(resp) {
  181. if message.NewParentPath == option.bucketsDir {
  182. return handleCreateBucket(message.NewEntry)
  183. }
  184. if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
  185. return nil
  186. }
  187. if !filer.HasData(message.NewEntry) {
  188. return nil
  189. }
  190. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath)
  191. if !ok {
  192. return nil
  193. }
  194. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  195. if err != nil {
  196. return err
  197. }
  198. glog.V(2).Infof("create: %+v", resp)
  199. if !shouldSendToRemote(message.NewEntry) {
  200. glog.V(2).Infof("skipping creating: %+v", resp)
  201. return nil
  202. }
  203. dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
  204. if message.NewEntry.IsDirectory {
  205. glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
  206. return client.WriteDirectory(dest, message.NewEntry)
  207. }
  208. glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
  209. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, dest)
  210. if writeErr != nil {
  211. return writeErr
  212. }
  213. return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
  214. }
  215. if filer_pb.IsDelete(resp) {
  216. if resp.Directory == option.bucketsDir {
  217. return handleDeleteBucket(message.OldEntry)
  218. }
  219. bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory)
  220. if !ok {
  221. return nil
  222. }
  223. client, err := remote_storage.GetRemoteStorage(remoteStorage)
  224. if err != nil {
  225. return err
  226. }
  227. glog.V(2).Infof("delete: %+v", resp)
  228. dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
  229. if message.OldEntry.IsDirectory {
  230. glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
  231. return client.RemoveDirectory(dest)
  232. }
  233. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
  234. return client.DeleteFile(dest)
  235. }
  236. if message.OldEntry != nil && message.NewEntry != nil {
  237. if resp.Directory == option.bucketsDir {
  238. if message.NewParentPath == option.bucketsDir {
  239. if message.OldEntry.Name == message.NewEntry.Name {
  240. return nil
  241. }
  242. if err := handleCreateBucket(message.NewEntry); err != nil {
  243. return err
  244. }
  245. if err := handleDeleteBucket(message.OldEntry); err != nil {
  246. return err
  247. }
  248. }
  249. }
  250. if isMultipartUploadFile(message.NewParentPath, message.NewEntry.Name) {
  251. return nil
  252. }
  253. oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory)
  254. newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath)
  255. if oldOk && newOk {
  256. if !shouldSendToRemote(message.NewEntry) {
  257. glog.V(2).Infof("skipping updating: %+v", resp)
  258. return nil
  259. }
  260. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  261. if err != nil {
  262. return err
  263. }
  264. if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
  265. // update the same entry
  266. if message.NewEntry.IsDirectory {
  267. // update directory property
  268. return nil
  269. }
  270. if message.OldEntry.RemoteEntry != nil && filer.IsSameData(message.OldEntry, message.NewEntry) {
  271. glog.V(2).Infof("update meta: %+v", resp)
  272. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  273. return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
  274. } else {
  275. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  276. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
  277. if writeErr != nil {
  278. return writeErr
  279. }
  280. return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
  281. }
  282. }
  283. }
  284. // the following is entry rename
  285. if oldOk {
  286. client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
  287. if err != nil {
  288. return err
  289. }
  290. oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
  291. if message.OldEntry.IsDirectory {
  292. return client.RemoveDirectory(oldDest)
  293. }
  294. glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
  295. if err := client.DeleteFile(oldDest); err != nil {
  296. return err
  297. }
  298. }
  299. if newOk {
  300. if !shouldSendToRemote(message.NewEntry) {
  301. glog.V(2).Infof("skipping updating: %+v", resp)
  302. return nil
  303. }
  304. client, err := remote_storage.GetRemoteStorage(newRemoteStorage)
  305. if err != nil {
  306. return err
  307. }
  308. newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
  309. if message.NewEntry.IsDirectory {
  310. return client.WriteDirectory(newDest, message.NewEntry)
  311. }
  312. remoteEntry, writeErr := retriedWriteFile(client, filerSource, message.NewEntry, newDest)
  313. if writeErr != nil {
  314. return writeErr
  315. }
  316. return updateLocalEntry(option, message.NewParentPath, message.NewEntry, remoteEntry)
  317. }
  318. }
  319. return nil
  320. }
  321. return eachEntryFunc, nil
  322. }
  323. func (option *RemoteGatewayOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) {
  324. bucket := util.FullPath(option.bucketsDir).Child(bucketName)
  325. var isMounted bool
  326. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  327. if !isMounted {
  328. return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket)
  329. }
  330. remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
  331. if !hasClient {
  332. return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  333. }
  334. client, err = remote_storage.GetRemoteStorage(remoteConf)
  335. if err != nil {
  336. return nil, remoteStorageMountLocation, err
  337. }
  338. return client, remoteStorageMountLocation, nil
  339. }
  340. func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
  341. bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
  342. if !ok {
  343. return "", nil, nil, false
  344. }
  345. var isMounted bool
  346. remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
  347. if !isMounted {
  348. glog.Warningf("%s is not mounted", bucket)
  349. return "", nil, nil, false
  350. }
  351. var hasClient bool
  352. remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name]
  353. if !hasClient {
  354. glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
  355. return "", nil, nil, false
  356. }
  357. return bucket, remoteStorageMountLocation, remoteConf, true
  358. }
  359. func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
  360. if !strings.HasPrefix(dir, bucketsDir+"/") {
  361. return "", false
  362. }
  363. parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2)
  364. return util.FullPath(bucketsDir).Child(parts[0]), true
  365. }
  366. func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) {
  367. if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil {
  368. if err == filer_pb.ErrNotFound {
  369. return fmt.Errorf("remote storage is not configured in filer server")
  370. }
  371. return err
  372. } else {
  373. option.mappings = mappings
  374. }
  375. option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
  376. var lastConfName string
  377. err = filer_pb.List(context.Background(), option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
  378. if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
  379. return nil
  380. }
  381. conf := &remote_pb.RemoteConf{}
  382. if err := proto.Unmarshal(entry.Content, conf); err != nil {
  383. return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
  384. }
  385. option.remoteConfs[conf.Name] = conf
  386. lastConfName = conf.Name
  387. return nil
  388. }, "", false, math.MaxUint32)
  389. if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 {
  390. glog.V(0).Infof("%s is set to the default remote storage", lastConfName)
  391. option.mappings.PrimaryBucketStorageName = lastConfName
  392. }
  393. return
  394. }