mount_std.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. //go:build linux || darwin || freebsd
  2. // +build linux darwin freebsd
  3. package command
  4. import (
  5. "context"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. "os"
  10. "os/user"
  11. "runtime"
  12. "strconv"
  13. "strings"
  14. "syscall"
  15. "time"
  16. "github.com/seaweedfs/seaweedfs/weed/util/version"
  17. "github.com/hanwen/go-fuse/v2/fuse"
  18. "github.com/seaweedfs/seaweedfs/weed/glog"
  19. "github.com/seaweedfs/seaweedfs/weed/mount"
  20. "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
  21. "github.com/seaweedfs/seaweedfs/weed/mount/unmount"
  22. "github.com/seaweedfs/seaweedfs/weed/pb"
  23. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  24. "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
  25. "github.com/seaweedfs/seaweedfs/weed/security"
  26. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  27. "google.golang.org/grpc/reflection"
  28. "github.com/seaweedfs/seaweedfs/weed/util"
  29. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  30. )
  31. func runMount(cmd *Command, args []string) bool {
  32. if *mountOptions.debug {
  33. go http.ListenAndServe(fmt.Sprintf(":%d", *mountOptions.debugPort), nil)
  34. }
  35. grace.SetupProfiling(*mountCpuProfile, *mountMemProfile)
  36. if *mountReadRetryTime < time.Second {
  37. *mountReadRetryTime = time.Second
  38. }
  39. util.RetryWaitTime = *mountReadRetryTime
  40. umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64)
  41. if umaskErr != nil {
  42. fmt.Printf("can not parse umask %s", *mountOptions.umaskString)
  43. return false
  44. }
  45. if len(args) > 0 {
  46. return false
  47. }
  48. return RunMount(&mountOptions, os.FileMode(umask))
  49. }
  50. func RunMount(option *MountOptions, umask os.FileMode) bool {
  51. // basic checks
  52. chunkSizeLimitMB := *mountOptions.chunkSizeLimitMB
  53. if chunkSizeLimitMB <= 0 {
  54. fmt.Printf("Please specify a reasonable buffer size.\n")
  55. return false
  56. }
  57. // try to connect to filer
  58. filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses()
  59. util.LoadSecurityConfiguration()
  60. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  61. var cipher bool
  62. var err error
  63. for i := 0; i < 10; i++ {
  64. err = pb.WithOneOfGrpcFilerClients(false, filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  65. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  66. if err != nil {
  67. return fmt.Errorf("get filer grpc address %v configuration: %w", filerAddresses, err)
  68. }
  69. cipher = resp.Cipher
  70. return nil
  71. })
  72. if err != nil {
  73. glog.V(0).Infof("failed to talk to filer %v: %v", filerAddresses, err)
  74. glog.V(0).Infof("wait for %d seconds ...", i+1)
  75. time.Sleep(time.Duration(i+1) * time.Second)
  76. }
  77. }
  78. if err != nil {
  79. glog.Errorf("failed to talk to filer %v: %v", filerAddresses, err)
  80. return true
  81. }
  82. filerMountRootPath := *option.filerMountRootPath
  83. // clean up mount point
  84. dir := util.ResolvePath(*option.dir)
  85. if dir == "" {
  86. fmt.Printf("Please specify the mount directory via \"-dir\"")
  87. return false
  88. }
  89. unmount.Unmount(dir)
  90. // start on local unix socket
  91. if *option.localSocket == "" {
  92. mountDirHash := util.HashToInt32([]byte(dir))
  93. if mountDirHash < 0 {
  94. mountDirHash = -mountDirHash
  95. }
  96. *option.localSocket = fmt.Sprintf("/tmp/seaweedfs-mount-%d.sock", mountDirHash)
  97. }
  98. if err := os.Remove(*option.localSocket); err != nil && !os.IsNotExist(err) {
  99. glog.Fatalf("Failed to remove %s, error: %s", *option.localSocket, err.Error())
  100. }
  101. montSocketListener, err := net.Listen("unix", *option.localSocket)
  102. if err != nil {
  103. glog.Fatalf("Failed to listen on %s: %v", *option.localSocket, err)
  104. }
  105. // detect mount folder mode
  106. if *option.dirAutoCreate {
  107. os.MkdirAll(dir, os.FileMode(0777)&^umask)
  108. }
  109. fileInfo, err := os.Stat(dir)
  110. // collect uid, gid
  111. uid, gid := uint32(0), uint32(0)
  112. mountMode := os.ModeDir | 0777
  113. if err == nil {
  114. mountMode = os.ModeDir | os.FileMode(0777)&^umask
  115. uid, gid = util.GetFileUidGid(fileInfo)
  116. fmt.Printf("mount point owner uid=%d gid=%d mode=%s\n", uid, gid, mountMode)
  117. } else {
  118. fmt.Printf("can not stat %s\n", dir)
  119. return false
  120. }
  121. // detect uid, gid
  122. if uid == 0 {
  123. if u, err := user.Current(); err == nil {
  124. if parsedId, pe := strconv.ParseUint(u.Uid, 10, 32); pe == nil {
  125. uid = uint32(parsedId)
  126. }
  127. if parsedId, pe := strconv.ParseUint(u.Gid, 10, 32); pe == nil {
  128. gid = uint32(parsedId)
  129. }
  130. fmt.Printf("current uid=%d gid=%d\n", uid, gid)
  131. }
  132. }
  133. // mapping uid, gid
  134. uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap)
  135. if err != nil {
  136. fmt.Printf("failed to parse %s %s: %v\n", *option.uidMap, *option.gidMap, err)
  137. return false
  138. }
  139. // Ensure target mount point availability
  140. if isValid := checkMountPointAvailable(dir); !isValid {
  141. glog.Fatalf("Target mount point is not available: %s, please check!", dir)
  142. return true
  143. }
  144. serverFriendlyName := strings.ReplaceAll(*option.filer, ",", "+")
  145. // mount fuse
  146. fuseMountOptions := &fuse.MountOptions{
  147. AllowOther: *option.allowOthers,
  148. Options: option.extraOptions,
  149. MaxBackground: 128,
  150. MaxWrite: 1024 * 1024 * 2,
  151. MaxReadAhead: 1024 * 1024 * 2,
  152. IgnoreSecurityLabels: false,
  153. RememberInodes: false,
  154. FsName: serverFriendlyName + ":" + filerMountRootPath,
  155. Name: "seaweedfs",
  156. SingleThreaded: false,
  157. DisableXAttrs: *option.disableXAttr,
  158. Debug: *option.debug,
  159. EnableLocks: false,
  160. ExplicitDataCacheControl: false,
  161. DirectMount: true,
  162. DirectMountFlags: 0,
  163. //SyncRead: false, // set to false to enable the FUSE_CAP_ASYNC_READ capability
  164. EnableAcl: true,
  165. }
  166. if *option.nonempty {
  167. fuseMountOptions.Options = append(fuseMountOptions.Options, "nonempty")
  168. }
  169. if *option.readOnly {
  170. if runtime.GOOS == "darwin" {
  171. fuseMountOptions.Options = append(fuseMountOptions.Options, "rdonly")
  172. } else {
  173. fuseMountOptions.Options = append(fuseMountOptions.Options, "ro")
  174. }
  175. }
  176. if runtime.GOOS == "darwin" {
  177. // https://github-wiki-see.page/m/macfuse/macfuse/wiki/Mount-Options
  178. ioSizeMB := 1
  179. for ioSizeMB*2 <= *option.chunkSizeLimitMB && ioSizeMB*2 <= 32 {
  180. ioSizeMB *= 2
  181. }
  182. fuseMountOptions.Options = append(fuseMountOptions.Options, "daemon_timeout=600")
  183. if runtime.GOARCH == "amd64" {
  184. fuseMountOptions.Options = append(fuseMountOptions.Options, "noapplexattr")
  185. }
  186. // fuseMountOptions.Options = append(fuseMountOptions.Options, "novncache") // need to test effectiveness
  187. fuseMountOptions.Options = append(fuseMountOptions.Options, "slow_statfs")
  188. fuseMountOptions.Options = append(fuseMountOptions.Options, "volname="+serverFriendlyName)
  189. fuseMountOptions.Options = append(fuseMountOptions.Options, fmt.Sprintf("iosize=%d", ioSizeMB*1024*1024))
  190. }
  191. // find mount point
  192. mountRoot := filerMountRootPath
  193. if mountRoot != "/" && strings.HasSuffix(mountRoot, "/") {
  194. mountRoot = mountRoot[0 : len(mountRoot)-1]
  195. }
  196. cacheDirForWrite := *option.cacheDirForWrite
  197. if cacheDirForWrite == "" {
  198. cacheDirForWrite = *option.cacheDirForRead
  199. }
  200. seaweedFileSystem := mount.NewSeaweedFileSystem(&mount.Option{
  201. MountDirectory: dir,
  202. FilerAddresses: filerAddresses,
  203. GrpcDialOption: grpcDialOption,
  204. FilerMountRootPath: mountRoot,
  205. Collection: *option.collection,
  206. Replication: *option.replication,
  207. TtlSec: int32(*option.ttlSec),
  208. DiskType: types.ToDiskType(*option.diskType),
  209. ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
  210. ConcurrentWriters: *option.concurrentWriters,
  211. CacheDirForRead: *option.cacheDirForRead,
  212. CacheSizeMBForRead: *option.cacheSizeMBForRead,
  213. CacheDirForWrite: cacheDirForWrite,
  214. CacheMetaTTlSec: *option.cacheMetaTtlSec,
  215. DataCenter: *option.dataCenter,
  216. Quota: int64(*option.collectionQuota) * 1024 * 1024,
  217. MountUid: uid,
  218. MountGid: gid,
  219. MountMode: mountMode,
  220. MountCtime: fileInfo.ModTime(),
  221. MountMtime: time.Now(),
  222. Umask: umask,
  223. VolumeServerAccess: *mountOptions.volumeServerAccess,
  224. Cipher: cipher,
  225. UidGidMapper: uidGidMapper,
  226. DisableXAttr: *option.disableXAttr,
  227. IsMacOs: runtime.GOOS == "darwin",
  228. // RDMA acceleration options
  229. RdmaEnabled: *option.rdmaEnabled,
  230. RdmaSidecarAddr: *option.rdmaSidecarAddr,
  231. RdmaFallback: *option.rdmaFallback,
  232. RdmaReadOnly: *option.rdmaReadOnly,
  233. RdmaMaxConcurrent: *option.rdmaMaxConcurrent,
  234. RdmaTimeoutMs: *option.rdmaTimeoutMs,
  235. })
  236. // create mount root
  237. mountRootPath := util.FullPath(mountRoot)
  238. mountRootParent, mountDir := mountRootPath.DirAndName()
  239. if err = filer_pb.Mkdir(context.Background(), seaweedFileSystem, mountRootParent, mountDir, nil); err != nil {
  240. fmt.Printf("failed to create dir %s on filer %s: %v\n", mountRoot, filerAddresses, err)
  241. return false
  242. }
  243. server, err := fuse.NewServer(seaweedFileSystem, dir, fuseMountOptions)
  244. if err != nil {
  245. glog.Fatalf("Mount fail: %v", err)
  246. }
  247. grace.OnInterrupt(func() {
  248. unmount.Unmount(dir)
  249. })
  250. if mountOptions.fuseCommandPid != 0 {
  251. // send a signal to the parent process to notify that the mount is ready
  252. err = syscall.Kill(mountOptions.fuseCommandPid, syscall.SIGTERM)
  253. if err != nil {
  254. fmt.Printf("failed to notify parent process: %v\n", err)
  255. return false
  256. }
  257. }
  258. grpcS := pb.NewGrpcServer()
  259. mount_pb.RegisterSeaweedMountServer(grpcS, seaweedFileSystem)
  260. reflection.Register(grpcS)
  261. go grpcS.Serve(montSocketListener)
  262. err = seaweedFileSystem.StartBackgroundTasks()
  263. if err != nil {
  264. fmt.Printf("failed to start background tasks: %v\n", err)
  265. return false
  266. }
  267. glog.V(0).Infof("mounted %s%s to %v", *option.filer, mountRoot, dir)
  268. glog.V(0).Infof("This is SeaweedFS version %s %s %s", version.Version(), runtime.GOOS, runtime.GOARCH)
  269. server.Serve()
  270. seaweedFileSystem.ClearCacheDir()
  271. return true
  272. }