| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414 |
- package shell
- import (
- "context"
- "flag"
- "fmt"
- "io"
- "sync"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
- "github.com/seaweedfs/seaweedfs/weed/remote_storage"
- "github.com/seaweedfs/seaweedfs/weed/util"
- )
- func init() {
- Commands = append(Commands, &commandRemoteCache{})
- }
- type commandRemoteCache struct {
- }
- func (c *commandRemoteCache) Name() string {
- return "remote.cache"
- }
- func (c *commandRemoteCache) Help() string {
- return `comprehensive synchronization and caching between local and remote storage
- # assume a remote storage is configured to name "cloud1"
- remote.configure -name=cloud1 -type=s3 -s3.access_key=xxx -s3.secret_key=yyy
- # mount and pull one bucket
- remote.mount -dir=/xxx -remote=cloud1/bucket
- # comprehensive sync and cache: update metadata, cache content, and remove deleted files
- remote.cache -dir=/xxx # sync metadata, cache content, and remove deleted files (default)
- remote.cache -dir=/xxx -cacheContent=false # sync metadata and cleanup only, no caching
- remote.cache -dir=/xxx -deleteLocalExtra=false # skip removal of local files missing from remote
- remote.cache -dir=/xxx -concurrent=32 # with custom concurrency
- remote.cache -dir=/xxx -include=*.pdf # only sync PDF files
- remote.cache -dir=/xxx -exclude=*.tmp # exclude temporary files
- remote.cache -dir=/xxx -dryRun=true # show what would be done without making changes
- This command will:
- 1. Synchronize metadata from remote storage
- 2. Cache file content from remote by default
- 3. Remove local files that no longer exist on remote by default (use -deleteLocalExtra=false to disable)
- This is designed to run regularly. So you can add it to some cronjob.
- `
- }
- func (c *commandRemoteCache) HasTag(CommandTag) bool {
- return false
- }
- func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- remoteCacheCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
- dir := remoteCacheCommand.String("dir", "", "a directory in filer")
- cache := remoteCacheCommand.Bool("cacheContent", true, "cache file content from remote")
- deleteLocalExtra := remoteCacheCommand.Bool("deleteLocalExtra", true, "delete local files that no longer exist on remote")
- concurrency := remoteCacheCommand.Int("concurrent", 16, "concurrent file operations")
- dryRun := remoteCacheCommand.Bool("dryRun", false, "show what would be done without making changes")
- fileFiler := newFileFilter(remoteCacheCommand)
- if err = remoteCacheCommand.Parse(args); err != nil {
- return nil
- }
- if *dir == "" {
- return fmt.Errorf("need to specify -dir option")
- }
- mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, *dir)
- if detectErr != nil {
- jsonPrintln(writer, mappings)
- return detectErr
- }
- // perform comprehensive sync
- return c.doComprehensiveSync(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(*dir), remoteStorageConf, *cache, *deleteLocalExtra, *concurrency, *dryRun, fileFiler)
- }
- func (c *commandRemoteCache) doComprehensiveSync(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToSync util.FullPath, remoteConf *remote_pb.RemoteConf, shouldCache bool, deleteLocalExtra bool, concurrency int, dryRun bool, fileFilter *FileFilter) error {
- // visit remote storage
- remoteStorage, err := remote_storage.GetRemoteStorage(remoteConf)
- if err != nil {
- return err
- }
- remote := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dirToSync)
- // Step 1: Collect all remote files
- remoteFiles := make(map[string]*filer_pb.RemoteEntry)
- err = remoteStorage.Traverse(remote, func(remoteDir, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
- localDir := filer.MapRemoteStorageLocationPathToFullPath(localMountedDir, remoteMountedLocation, remoteDir)
- fullPath := string(localDir.Child(name))
- remoteFiles[fullPath] = remoteEntry
- return nil
- })
- if err != nil {
- return fmt.Errorf("failed to traverse remote storage: %w", err)
- }
- fmt.Fprintf(writer, "Found %d files/directories in remote storage\n", len(remoteFiles))
- // Step 2: Collect all local files (only if we need to delete local extra files)
- localFiles := make(map[string]*filer_pb.Entry)
- if deleteLocalExtra {
- err = recursivelyTraverseDirectory(commandEnv, dirToSync, func(dir util.FullPath, entry *filer_pb.Entry) bool {
- if entry.RemoteEntry != nil { // only consider files that are part of remote mount
- fullPath := string(dir.Child(entry.Name))
- localFiles[fullPath] = entry
- }
- return true
- })
- if err != nil {
- return fmt.Errorf("failed to traverse local directory: %w", err)
- }
- fmt.Fprintf(writer, "Found %d files/directories in local storage\n", len(localFiles))
- } else {
- fmt.Fprintf(writer, "Skipping local file collection (deleteLocalExtra=false)\n")
- }
- // Step 3: Determine actions needed
- var filesToDelete []string
- var filesToUpdate []string
- var filesToCache []string
- // Find files to delete (exist locally but not remotely) - only if deleteLocalExtra is enabled
- if deleteLocalExtra {
- for localPath := range localFiles {
- if _, exists := remoteFiles[localPath]; !exists {
- filesToDelete = append(filesToDelete, localPath)
- }
- }
- }
- // Find files to update/cache (exist remotely)
- for remotePath, remoteEntry := range remoteFiles {
- if deleteLocalExtra {
- // When deleteLocalExtra is enabled, we have localFiles to compare with
- if localEntry, exists := localFiles[remotePath]; exists {
- // File exists locally, check if it needs updating
- if localEntry.RemoteEntry == nil ||
- localEntry.RemoteEntry.RemoteETag != remoteEntry.RemoteETag ||
- localEntry.RemoteEntry.RemoteMtime < remoteEntry.RemoteMtime {
- filesToUpdate = append(filesToUpdate, remotePath)
- }
- // Check if it needs caching
- if shouldCache && shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) {
- filesToCache = append(filesToCache, remotePath)
- }
- } else {
- // File doesn't exist locally, needs to be created
- filesToUpdate = append(filesToUpdate, remotePath)
- }
- } else {
- // When deleteLocalExtra is disabled, we check each file individually
- // All remote files are candidates for update/creation
- filesToUpdate = append(filesToUpdate, remotePath)
- // For caching, we need to check if the local file exists and needs caching
- if shouldCache {
- // We need to look up the local file to check if it needs caching
- localDir, name := util.FullPath(remotePath).DirAndName()
- err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- lookupResp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
- Directory: localDir,
- Name: name,
- })
- if lookupErr == nil {
- localEntry := lookupResp.Entry
- if shouldCacheToLocal(localEntry) && fileFilter.matches(localEntry) {
- filesToCache = append(filesToCache, remotePath)
- }
- }
- return nil // Don't propagate lookup errors here
- })
- if err != nil {
- // Log error but continue
- fmt.Fprintf(writer, "Warning: failed to lookup local file %s for caching check: %v\n", remotePath, err)
- }
- }
- }
- }
- fmt.Fprintf(writer, "Actions needed: %d files to delete, %d files to update, %d files to cache\n",
- len(filesToDelete), len(filesToUpdate), len(filesToCache))
- if dryRun {
- fmt.Fprintf(writer, "DRY RUN - showing what would be done:\n")
- for _, path := range filesToDelete {
- fmt.Fprintf(writer, "DELETE: %s\n", path)
- }
- for _, path := range filesToUpdate {
- fmt.Fprintf(writer, "UPDATE: %s\n", path)
- }
- for _, path := range filesToCache {
- fmt.Fprintf(writer, "CACHE: %s\n", path)
- }
- return nil
- }
- // Step 4: Execute actions
- return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- ctx := context.Background()
- // Delete files that no longer exist on remote (only if deleteLocalExtra is enabled)
- if deleteLocalExtra {
- for _, pathToDelete := range filesToDelete {
- fmt.Fprintf(writer, "Deleting %s... ", pathToDelete)
- dir, name := util.FullPath(pathToDelete).DirAndName()
- _, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
- Directory: dir,
- Name: name,
- IgnoreRecursiveError: false,
- IsDeleteData: true,
- IsRecursive: false,
- IsFromOtherCluster: false,
- })
- if err != nil {
- fmt.Fprintf(writer, "failed: %v\n", err)
- return err
- }
- fmt.Fprintf(writer, "done\n")
- }
- }
- // Update metadata for files that exist on remote
- for _, pathToUpdate := range filesToUpdate {
- remoteEntry := remoteFiles[pathToUpdate]
- localDir, name := util.FullPath(pathToUpdate).DirAndName()
- fmt.Fprintf(writer, "Updating metadata for %s... ", pathToUpdate)
- // Check if file exists locally
- lookupResp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
- Directory: string(localDir),
- Name: name,
- })
- if lookupErr != nil && lookupErr != filer_pb.ErrNotFound {
- fmt.Fprintf(writer, "failed to lookup: %v\n", lookupErr)
- continue
- }
- isDirectory := remoteEntry.RemoteSize == 0 && remoteEntry.RemoteMtime == 0
- if lookupErr == filer_pb.ErrNotFound {
- // Create new entry
- _, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
- Directory: string(localDir),
- Entry: &filer_pb.Entry{
- Name: name,
- IsDirectory: isDirectory,
- Attributes: &filer_pb.FuseAttributes{
- FileSize: uint64(remoteEntry.RemoteSize),
- Mtime: remoteEntry.RemoteMtime,
- FileMode: uint32(0644),
- },
- RemoteEntry: remoteEntry,
- },
- })
- if createErr != nil {
- fmt.Fprintf(writer, "failed to create: %v\n", createErr)
- continue
- }
- } else {
- // Update existing entry
- existingEntry := lookupResp.Entry
- if existingEntry.RemoteEntry == nil {
- // This is a local file, skip to avoid overwriting
- fmt.Fprintf(writer, "skipped (local file)\n")
- continue
- }
- existingEntry.RemoteEntry = remoteEntry
- existingEntry.Attributes.FileSize = uint64(remoteEntry.RemoteSize)
- existingEntry.Attributes.Mtime = remoteEntry.RemoteMtime
- existingEntry.Attributes.Md5 = nil
- existingEntry.Chunks = nil
- existingEntry.Content = nil
- _, updateErr := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
- Directory: string(localDir),
- Entry: existingEntry,
- })
- if updateErr != nil {
- fmt.Fprintf(writer, "failed to update: %v\n", updateErr)
- continue
- }
- }
- fmt.Fprintf(writer, "done\n")
- }
- // Cache file content if requested
- if shouldCache && len(filesToCache) > 0 {
- fmt.Fprintf(writer, "Caching file content...\n")
- var wg sync.WaitGroup
- limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency)
- var executionErr error
- for _, pathToCache := range filesToCache {
- wg.Add(1)
- pathToCacheCopy := pathToCache // Capture for closure
- limitedConcurrentExecutor.Execute(func() {
- defer wg.Done()
- // Get local entry (either from localFiles map or by lookup)
- var localEntry *filer_pb.Entry
- if deleteLocalExtra {
- localEntry = localFiles[pathToCacheCopy]
- if localEntry == nil {
- fmt.Fprintf(writer, "Warning: skipping cache for %s (local entry not found)\n", pathToCacheCopy)
- return
- }
- } else {
- // Look up the local entry since we don't have it in localFiles
- localDir, name := util.FullPath(pathToCacheCopy).DirAndName()
- lookupErr := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- lookupResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
- Directory: localDir,
- Name: name,
- })
- if err == nil {
- localEntry = lookupResp.Entry
- }
- return err
- })
- if lookupErr != nil {
- fmt.Fprintf(writer, "Warning: failed to lookup local entry for caching %s: %v\n", pathToCacheCopy, lookupErr)
- return
- }
- }
- dir, _ := util.FullPath(pathToCacheCopy).DirAndName()
- remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, util.FullPath(pathToCacheCopy))
- fmt.Fprintf(writer, "Caching %s... ", pathToCacheCopy)
- if err := filer.CacheRemoteObjectToLocalCluster(commandEnv, remoteConf, remoteLocation, util.FullPath(dir), localEntry); err != nil {
- fmt.Fprintf(writer, "failed: %v\n", err)
- if executionErr == nil {
- executionErr = err
- }
- return
- }
- fmt.Fprintf(writer, "done\n")
- })
- }
- wg.Wait()
- if executionErr != nil {
- return executionErr
- }
- }
- return nil
- })
- }
- func recursivelyTraverseDirectory(filerClient filer_pb.FilerClient, dirPath util.FullPath, visitEntry func(dir util.FullPath, entry *filer_pb.Entry) bool) (err error) {
- err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, dirPath, "", func(entry *filer_pb.Entry, isLast bool) error {
- if entry.IsDirectory {
- if !visitEntry(dirPath, entry) {
- return nil
- }
- subDir := dirPath.Child(entry.Name)
- if err := recursivelyTraverseDirectory(filerClient, subDir, visitEntry); err != nil {
- return err
- }
- } else {
- if !visitEntry(dirPath, entry) {
- return nil
- }
- }
- return nil
- })
- return
- }
- func shouldCacheToLocal(entry *filer_pb.Entry) bool {
- if entry.IsDirectory {
- return false
- }
- if entry.RemoteEntry == nil {
- return false
- }
- if entry.RemoteEntry.LastLocalSyncTsNs == 0 && entry.RemoteEntry.RemoteSize > 0 {
- return true
- }
- return false
- }
- func mayHaveCachedToLocal(entry *filer_pb.Entry) bool {
- if entry.IsDirectory {
- return false
- }
- if entry.RemoteEntry == nil {
- return false // should not uncache an entry that is not in remote
- }
- if entry.RemoteEntry.LastLocalSyncTsNs > 0 {
- return true
- }
- return false
- }
|