command_remote_cache.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "github.com/seaweedfs/seaweedfs/weed/filer"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/remote_storage"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandRemoteCache{})
  16. }
  17. type commandRemoteCache struct {
  18. }
  19. func (c *commandRemoteCache) Name() string {
  20. return "remote.cache"
  21. }
  22. func (c *commandRemoteCache) Help() string {
  23. return `comprehensive synchronization and caching between local and remote storage
  24. # assume a remote storage is configured to name "cloud1"
  25. remote.configure -name=cloud1 -type=s3 -s3.access_key=xxx -s3.secret_key=yyy
  26. # mount and pull one bucket
  27. remote.mount -dir=/xxx -remote=cloud1/bucket
  28. # comprehensive sync and cache: update metadata, cache content, and remove deleted files
  29. remote.cache -dir=/xxx # sync metadata, cache content, and remove deleted files (default)
  30. remote.cache -dir=/xxx -cacheContent=false # sync metadata and cleanup only, no caching
  31. remote.cache -dir=/xxx -deleteLocalExtra=false # skip removal of local files missing from remote
  32. remote.cache -dir=/xxx -concurrent=32 # with custom concurrency
  33. remote.cache -dir=/xxx -include=*.pdf # only sync PDF files
  34. remote.cache -dir=/xxx -exclude=*.tmp # exclude temporary files
  35. remote.cache -dir=/xxx -dryRun=true # show what would be done without making changes
  36. This command will:
  37. 1. Synchronize metadata from remote storage
  38. 2. Cache file content from remote by default
  39. 3. Remove local files that no longer exist on remote by default (use -deleteLocalExtra=false to disable)
  40. This is designed to run regularly. So you can add it to some cronjob.
  41. `
  42. }
  43. func (c *commandRemoteCache) HasTag(CommandTag) bool {
  44. return false
  45. }
  46. func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  47. remoteCacheCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  48. dir := remoteCacheCommand.String("dir", "", "a directory in filer")
  49. cache := remoteCacheCommand.Bool("cacheContent", true, "cache file content from remote")
  50. deleteLocalExtra := remoteCacheCommand.Bool("deleteLocalExtra", true, "delete local files that no longer exist on remote")
  51. concurrency := remoteCacheCommand.Int("concurrent", 16, "concurrent file operations")
  52. dryRun := remoteCacheCommand.Bool("dryRun", false, "show what would be done without making changes")
  53. fileFiler := newFileFilter(remoteCacheCommand)
  54. if err = remoteCacheCommand.Parse(args); err != nil {
  55. return nil
  56. }
  57. if *dir == "" {
  58. return fmt.Errorf("need to specify -dir option")
  59. }
  60. mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
  61. if detectErr != nil {
  62. jsonPrintln(writer, mappings)
  63. return detectErr
  64. }
  65. // perform comprehensive sync
  66. return c.doComprehensiveSync(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf, *cache, *deleteLocalExtra, *concurrency, *dryRun, fileFiler)
  67. }
  68. func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToSync util.FullPath, remoteConf *remote_pb.RemoteConf, shouldCache bool, deleteLocalExtra bool, concurrency int, dryRun bool, fileFilter *FileFilter) error {
  69. // visit remote storage
  70. remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf)
  71. if err != nil {
  72. return err
  73. }
  74. remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToSync)
  75. // Step 1: Collect all remote files
  76. remoteFiles := make(map[string]*filer_pb.RemoteEntry)
  77. err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
  78. localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir)
  79. fullPath := string(localDir.Child(name))
  80. remoteFiles[fullPath] = remoteEntry
  81. return nil
  82. })
  83. if err != nil {
  84. return fmt.Errorf("failed to traverse remote storage: %w", err)
  85. }
  86. fmt.Fprintf(writer, "Found %d files/directories in remote storage\n", len(remoteFiles))
  87. // Step 2: Collect all local files (only if we need to delete local extra files)
  88. localFiles := make(map[string]*filer_pb.Entry)
  89. if deleteLocalExtra {
  90. err = recursivelyTraverseDirectory(commandEnv, dirToSync, func(dir util.FullPath, entry *filer_pb.Entry) bool {
  91. if entry.RemoteEntry != nil { // only consider files that are part of remote mount
  92. fullPath := string(dir.Child(entry.Name))
  93. localFiles[fullPath] = entry
  94. }
  95. return true
  96. })
  97. if err != nil {
  98. return fmt.Errorf("failed to traverse local directory: %w", err)
  99. }
  100. fmt.Fprintf(writer, "Found %d files/directories in local storage\n", len(localFiles))
  101. } else {
  102. fmt.Fprintf(writer, "Skipping local file collection (deleteLocalExtra=false)\n")
  103. }
  104. // Step 3: Determine actions needed
  105. var filesToDelete []string
  106. var filesToUpdate []string
  107. var filesToCache []string
  108. // Find files to delete (exist locally but not remotely) - only if deleteLocalExtra is enabled
  109. if deleteLocalExtra {
  110. for localPath := range localFiles {
  111. if _, exists := remoteFiles[localPath]; !exists {
  112. filesToDelete = append(filesToDelete, localPath)
  113. }
  114. }
  115. }
  116. // Find files to update/cache (exist remotely)
  117. for remotePath, remoteEntry := range remoteFiles {
  118. if deleteLocalExtra {
  119. // When deleteLocalExtra is enabled, we have localFiles to compare with
  120. if localEntry, exists := localFiles[remotePath]; exists {
  121. // File exists locally, check if it needs updating
  122. if localEntry.RemoteEntry == nil ||
  123. localEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag ||
  124. localEntry.RemoteEntry.RemoteMtime < remoteEntry.RemoteMtime {
  125. filesToUpdate = append(filesToUpdate, remotePath)
  126. }
  127. // Check if it needs caching
  128. if shouldCache && shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) {
  129. filesToCache = append(filesToCache, remotePath)
  130. }
  131. } else {
  132. // File doesn't exist locally, needs to be created
  133. filesToUpdate = append(filesToUpdate, remotePath)
  134. }
  135. } else {
  136. // When deleteLocalExtra is disabled, we check each file individually
  137. // All remote files are candidates for update/creation
  138. filesToUpdate = append(filesToUpdate, remotePath)
  139. // For caching, we need to check if the local file exists and needs caching
  140. if shouldCache {
  141. // We need to look up the local file to check if it needs caching
  142. localDir, name := util.FullPath(remotePath).DirAndName()
  143. err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  144. lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
  145. Directory: localDir,
  146. Name: name,
  147. })
  148. if lookupErr == nil {
  149. localEntry := lookupResp.Entry
  150. if shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) {
  151. filesToCache = append(filesToCache, remotePath)
  152. }
  153. }
  154. return nil // Don't propagate lookup errors here
  155. })
  156. if err != nil {
  157. // Log error but continue
  158. fmt.Fprintf(writer, "Warning: failed to lookup local file %s for caching check: %v\n", remotePath, err)
  159. }
  160. }
  161. }
  162. }
  163. fmt.Fprintf(writer, "Actions needed: %d files to delete, %d files to update, %d files to cache\n",
  164. len(filesToDelete), len(filesToUpdate), len(filesToCache))
  165. if dryRun {
  166. fmt.Fprintf(writer, "DRY RUN - showing what would be done:\n")
  167. for _, path := range filesToDelete {
  168. fmt.Fprintf(writer, "DELETE: %s\n", path)
  169. }
  170. for _, path := range filesToUpdate {
  171. fmt.Fprintf(writer, "UPDATE: %s\n", path)
  172. }
  173. for _, path := range filesToCache {
  174. fmt.Fprintf(writer, "CACHE: %s\n", path)
  175. }
  176. return nil
  177. }
  178. // Step 4: Execute actions
  179. return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  180. ctx := context.Background()
  181. // Delete files that no longer exist on remote (only if deleteLocalExtra is enabled)
  182. if deleteLocalExtra {
  183. for _, pathToDelete := range filesToDelete {
  184. fmt.Fprintf(writer, "Deleting %s... ", pathToDelete)
  185. dir, name := util.FullPath(pathToDelete).DirAndName()
  186. _, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
  187. Directory: dir,
  188. Name: name,
  189. IgnoreRecursiveError: false,
  190. IsDeleteData: true,
  191. IsRecursive: false,
  192. IsFromOtherCluster: false,
  193. })
  194. if err != nil {
  195. fmt.Fprintf(writer, "failed: %v\n", err)
  196. return err
  197. }
  198. fmt.Fprintf(writer, "done\n")
  199. }
  200. }
  201. // Update metadata for files that exist on remote
  202. for _, pathToUpdate := range filesToUpdate {
  203. remoteEntry := remoteFiles[pathToUpdate]
  204. localDir, name := util.FullPath(pathToUpdate).DirAndName()
  205. fmt.Fprintf(writer, "Updating metadata for %s... ", pathToUpdate)
  206. // Check if file exists locally
  207. lookupResp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
  208. Directory: string(localDir),
  209. Name: name,
  210. })
  211. if lookupErr != nil && lookupErr != filer_pb.ErrNotFound {
  212. fmt.Fprintf(writer, "failed to lookup: %v\n", lookupErr)
  213. continue
  214. }
  215. isDirectory := remoteEntry.RemoteSize == 0 && remoteEntry.RemoteMtime == 0
  216. if lookupErr == filer_pb.ErrNotFound {
  217. // Create new entry
  218. _, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
  219. Directory: string(localDir),
  220. Entry: &filer_pb.Entry{
  221. Name: name,
  222. IsDirectory: isDirectory,
  223. Attributes: &filer_pb.FuseAttributes{
  224. FileSize: uint64(remoteEntry.RemoteSize),
  225. Mtime: remoteEntry.RemoteMtime,
  226. FileMode: uint32(0644),
  227. },
  228. RemoteEntry: remoteEntry,
  229. },
  230. })
  231. if createErr != nil {
  232. fmt.Fprintf(writer, "failed to create: %v\n", createErr)
  233. continue
  234. }
  235. } else {
  236. // Update existing entry
  237. existingEntry := lookupResp.Entry
  238. if existingEntry.RemoteEntry == nil {
  239. // This is a local file, skip to avoid overwriting
  240. fmt.Fprintf(writer, "skipped (local file)\n")
  241. continue
  242. }
  243. existingEntry.RemoteEntry = remoteEntry
  244. existingEntry.Attributes.FileSize = uint64(remoteEntry.RemoteSize)
  245. existingEntry.Attributes.Mtime = remoteEntry.RemoteMtime
  246. existingEntry.Attributes.Md5 = nil
  247. existingEntry.Chunks = nil
  248. existingEntry.Content = nil
  249. _, updateErr := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
  250. Directory: string(localDir),
  251. Entry: existingEntry,
  252. })
  253. if updateErr != nil {
  254. fmt.Fprintf(writer, "failed to update: %v\n", updateErr)
  255. continue
  256. }
  257. }
  258. fmt.Fprintf(writer, "done\n")
  259. }
  260. // Cache file content if requested
  261. if shouldCache && len(filesToCache) > 0 {
  262. fmt.Fprintf(writer, "Caching file content...\n")
  263. var wg sync.WaitGroup
  264. limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency)
  265. var executionErr error
  266. for _, pathToCache := range filesToCache {
  267. wg.Add(1)
  268. pathToCacheCopy := pathToCache // Capture for closure
  269. limitedConcurrentExecutor.Execute(func() {
  270. defer wg.Done()
  271. // Get local entry (either from localFiles map or by lookup)
  272. var localEntry *filer_pb.Entry
  273. if deleteLocalExtra {
  274. localEntry = localFiles[pathToCacheCopy]
  275. if localEntry == nil {
  276. fmt.Fprintf(writer, "Warning: skipping cache for %s (local entry not found)\n", pathToCacheCopy)
  277. return
  278. }
  279. } else {
  280. // Look up the local entry since we don't have it in localFiles
  281. localDir, name := util.FullPath(pathToCacheCopy).DirAndName()
  282. lookupErr := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  283. lookupResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
  284. Directory: localDir,
  285. Name: name,
  286. })
  287. if err == nil {
  288. localEntry = lookupResp.Entry
  289. }
  290. return err
  291. })
  292. if lookupErr != nil {
  293. fmt.Fprintf(writer, "Warning: failed to lookup local entry for caching %s: %v\n", pathToCacheCopy, lookupErr)
  294. return
  295. }
  296. }
  297. dir, _ := util.FullPath(pathToCacheCopy).DirAndName()
  298. remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, util.FullPath(pathToCacheCopy))
  299. fmt.Fprintf(writer, "Caching %s... ", pathToCacheCopy)
  300. if err := filer.CacheRemoteObjectToLocalCluster(commandEnv, remoteConf, remoteLocation, util.FullPath(dir), localEntry); err != nil {
  301. fmt.Fprintf(writer, "failed: %v\n", err)
  302. if executionErr == nil {
  303. executionErr = err
  304. }
  305. return
  306. }
  307. fmt.Fprintf(writer, "done\n")
  308. })
  309. }
  310. wg.Wait()
  311. if executionErr != nil {
  312. return executionErr
  313. }
  314. }
  315. return nil
  316. })
  317. }
  318. func recursivelyTraverseDirectory(filerClient filer_pb.FilerClient, dirPath util.FullPath, visitEntry func(dir util.FullPath, entry *filer_pb.Entry) bool) (err error) {
  319. err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, dirPath, "", func(entry *filer_pb.Entry, isLast bool) error {
  320. if entry.IsDirectory {
  321. if !visitEntry(dirPath, entry) {
  322. return nil
  323. }
  324. subDir := dirPath.Child(entry.Name)
  325. if err := recursivelyTraverseDirectory(filerClient, subDir, visitEntry); err != nil {
  326. return err
  327. }
  328. } else {
  329. if !visitEntry(dirPath, entry) {
  330. return nil
  331. }
  332. }
  333. return nil
  334. })
  335. return
  336. }
  337. func shouldCacheToLocal(entry *filer_pb.Entry) bool {
  338. if entry.IsDirectory {
  339. return false
  340. }
  341. if entry.RemoteEntry == nil {
  342. return false
  343. }
  344. if entry.RemoteEntry.LastLocalSyncTsNs == 0 && entry.RemoteEntry.RemoteSize > 0 {
  345. return true
  346. }
  347. return false
  348. }
  349. func mayHaveCachedToLocal(entry *filer_pb.Entry) bool {
  350. if entry.IsDirectory {
  351. return false
  352. }
  353. if entry.RemoteEntry == nil {
  354. return false // should not uncache an entry that is not in remote
  355. }
  356. if entry.RemoteEntry.LastLocalSyncTsNs > 0 {
  357. return true
  358. }
  359. return false
  360. }