weedfs.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package mount
  2. import (
  3. "context"
  4. "errors"
  5. "math/rand/v2"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "github.com/hanwen/go-fuse/v2/fuse"
  13. "google.golang.org/grpc"
  14. "github.com/seaweedfs/seaweedfs/weed/filer"
  15. "github.com/seaweedfs/seaweedfs/weed/glog"
  16. "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
  17. "github.com/seaweedfs/seaweedfs/weed/pb"
  18. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  19. "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
  20. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  21. "github.com/seaweedfs/seaweedfs/weed/util"
  22. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  23. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  24. "github.com/seaweedfs/seaweedfs/weed/util/version"
  25. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  26. "github.com/hanwen/go-fuse/v2/fs"
  27. )
  28. type Option struct {
  29. filerIndex int32 // align memory for atomic read/write
  30. FilerAddresses []pb.ServerAddress
  31. MountDirectory string
  32. GrpcDialOption grpc.DialOption
  33. FilerMountRootPath string
  34. Collection string
  35. Replication string
  36. TtlSec int32
  37. DiskType types.DiskType
  38. ChunkSizeLimit int64
  39. ConcurrentWriters int
  40. CacheDirForRead string
  41. CacheSizeMBForRead int64
  42. CacheDirForWrite string
  43. CacheMetaTTlSec int
  44. DataCenter string
  45. Umask os.FileMode
  46. Quota int64
  47. DisableXAttr bool
  48. IsMacOs bool
  49. MountUid uint32
  50. MountGid uint32
  51. MountMode os.FileMode
  52. MountCtime time.Time
  53. MountMtime time.Time
  54. MountParentInode uint64
  55. VolumeServerAccess string // how to access volume servers
  56. Cipher bool // whether encrypt data on volume server
  57. UidGidMapper *meta_cache.UidGidMapper
  58. // RDMA acceleration options
  59. RdmaEnabled bool
  60. RdmaSidecarAddr string
  61. RdmaFallback bool
  62. RdmaReadOnly bool
  63. RdmaMaxConcurrent int
  64. RdmaTimeoutMs int
  65. uniqueCacheDirForRead string
  66. uniqueCacheDirForWrite string
  67. }
  68. type WFS struct {
  69. // https://dl.acm.org/doi/fullHtml/10.1145/3310148
  70. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  71. fuse.RawFileSystem
  72. mount_pb.UnimplementedSeaweedMountServer
  73. fs.Inode
  74. option *Option
  75. metaCache *meta_cache.MetaCache
  76. stats statsCache
  77. chunkCache *chunk_cache.TieredChunkCache
  78. signature int32
  79. concurrentWriters *util.LimitedConcurrentExecutor
  80. copyBufferPool sync.Pool
  81. concurrentCopiersSem chan struct{}
  82. inodeToPath *InodeToPath
  83. fhMap *FileHandleToInode
  84. dhMap *DirectoryHandleToInode
  85. fuseServer *fuse.Server
  86. IsOverQuota bool
  87. fhLockTable *util.LockTable[FileHandleId]
  88. rdmaClient *RDMAMountClient
  89. FilerConf *filer.FilerConf
  90. }
  91. func NewSeaweedFileSystem(option *Option) *WFS {
  92. wfs := &WFS{
  93. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  94. option: option,
  95. signature: util.RandomInt32(),
  96. inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec),
  97. fhMap: NewFileHandleToInode(),
  98. dhMap: NewDirectoryHandleToInode(),
  99. fhLockTable: util.NewLockTable[FileHandleId](),
  100. }
  101. wfs.option.filerIndex = int32(rand.IntN(len(option.FilerAddresses)))
  102. wfs.option.setupUniqueCacheDirectory()
  103. if option.CacheSizeMBForRead > 0 {
  104. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDirForRead(), option.CacheSizeMBForRead, 1024*1024)
  105. }
  106. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDirForRead(), "meta"), option.UidGidMapper,
  107. util.FullPath(option.FilerMountRootPath),
  108. func(path util.FullPath) {
  109. wfs.inodeToPath.MarkChildrenCached(path)
  110. }, func(path util.FullPath) bool {
  111. return wfs.inodeToPath.IsChildrenCached(path)
  112. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  113. // Find inode if it is not a deleted path
  114. if inode, inodeFound := wfs.inodeToPath.GetInode(filePath); inodeFound {
  115. // Find open file handle
  116. if fh, fhFound := wfs.fhMap.FindFileHandle(inode); fhFound {
  117. fhActiveLock := fh.wfs.fhLockTable.AcquireLock("invalidateFunc", fh.fh, util.ExclusiveLock)
  118. defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
  119. // Recreate dirty pages
  120. fh.dirtyPages.Destroy()
  121. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  122. // Update handle entry
  123. newEntry, status := wfs.maybeLoadEntry(filePath)
  124. if status == fuse.OK {
  125. if fh.GetEntry().GetEntry() != newEntry {
  126. fh.SetEntry(newEntry)
  127. }
  128. }
  129. }
  130. }
  131. })
  132. grace.OnInterrupt(func() {
  133. wfs.metaCache.Shutdown()
  134. os.RemoveAll(option.getUniqueCacheDirForWrite())
  135. os.RemoveAll(option.getUniqueCacheDirForRead())
  136. if wfs.rdmaClient != nil {
  137. wfs.rdmaClient.Close()
  138. }
  139. })
  140. // Initialize RDMA client if enabled
  141. if option.RdmaEnabled && option.RdmaSidecarAddr != "" {
  142. rdmaClient, err := NewRDMAMountClient(
  143. option.RdmaSidecarAddr,
  144. wfs.LookupFn(),
  145. option.RdmaMaxConcurrent,
  146. option.RdmaTimeoutMs,
  147. )
  148. if err != nil {
  149. glog.Warningf("Failed to initialize RDMA client: %v", err)
  150. } else {
  151. wfs.rdmaClient = rdmaClient
  152. glog.Infof("RDMA acceleration enabled: sidecar=%s, maxConcurrent=%d, timeout=%dms",
  153. option.RdmaSidecarAddr, option.RdmaMaxConcurrent, option.RdmaTimeoutMs)
  154. }
  155. }
  156. if wfs.option.ConcurrentWriters > 0 {
  157. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  158. wfs.concurrentCopiersSem = make(chan struct{}, wfs.option.ConcurrentWriters)
  159. }
  160. wfs.copyBufferPool.New = func() any {
  161. return make([]byte, option.ChunkSizeLimit)
  162. }
  163. return wfs
  164. }
  165. func (wfs *WFS) StartBackgroundTasks() error {
  166. follower, err := wfs.subscribeFilerConfEvents()
  167. if err != nil {
  168. return err
  169. }
  170. startTime := time.Now()
  171. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), follower)
  172. go wfs.loopCheckQuota()
  173. return nil
  174. }
  175. func (wfs *WFS) String() string {
  176. return "seaweedfs"
  177. }
  178. func (wfs *WFS) Init(server *fuse.Server) {
  179. wfs.fuseServer = server
  180. }
  181. func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
  182. path, status = wfs.inodeToPath.GetPath(inode)
  183. if status != fuse.OK {
  184. return
  185. }
  186. var found bool
  187. if fh, found = wfs.fhMap.FindFileHandle(inode); found {
  188. entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
  189. if entry != nil && fh.entry.Attributes == nil {
  190. entry.Attributes = &filer_pb.FuseAttributes{}
  191. }
  192. })
  193. } else {
  194. entry, status = wfs.maybeLoadEntry(path)
  195. }
  196. return
  197. }
  198. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  199. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  200. dir, name := fullpath.DirAndName()
  201. // return a valid entry for the mount root
  202. if string(fullpath) == wfs.option.FilerMountRootPath {
  203. return &filer_pb.Entry{
  204. Name: name,
  205. IsDirectory: true,
  206. Attributes: &filer_pb.FuseAttributes{
  207. Mtime: wfs.option.MountMtime.Unix(),
  208. FileMode: uint32(wfs.option.MountMode),
  209. Uid: wfs.option.MountUid,
  210. Gid: wfs.option.MountGid,
  211. Crtime: wfs.option.MountCtime.Unix(),
  212. },
  213. }, fuse.OK
  214. }
  215. // read from async meta cache
  216. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
  217. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  218. if errors.Is(cacheErr, filer_pb.ErrNotFound) {
  219. return nil, fuse.ENOENT
  220. }
  221. return cachedEntry.ToProtoEntry(), fuse.OK
  222. }
  223. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  224. if wfs.option.VolumeServerAccess == "filerProxy" {
  225. return func(ctx context.Context, fileId string) (targetUrls []string, err error) {
  226. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  227. }
  228. }
  229. return filer.LookupFn(wfs)
  230. }
  231. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  232. i := atomic.LoadInt32(&wfs.option.filerIndex)
  233. return wfs.option.FilerAddresses[i]
  234. }
  235. func (wfs *WFS) ClearCacheDir() {
  236. wfs.metaCache.Shutdown()
  237. os.RemoveAll(wfs.option.getUniqueCacheDirForWrite())
  238. os.RemoveAll(wfs.option.getUniqueCacheDirForRead())
  239. }
  240. func (option *Option) setupUniqueCacheDirectory() {
  241. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + version.Version()))[0:8]
  242. option.uniqueCacheDirForRead = path.Join(option.CacheDirForRead, cacheUniqueId)
  243. os.MkdirAll(option.uniqueCacheDirForRead, os.FileMode(0777)&^option.Umask)
  244. option.uniqueCacheDirForWrite = filepath.Join(path.Join(option.CacheDirForWrite, cacheUniqueId), "swap")
  245. os.MkdirAll(option.uniqueCacheDirForWrite, os.FileMode(0777)&^option.Umask)
  246. }
  247. func (option *Option) getUniqueCacheDirForWrite() string {
  248. return option.uniqueCacheDirForWrite
  249. }
  250. func (option *Option) getUniqueCacheDirForRead() string {
  251. return option.uniqueCacheDirForRead
  252. }