sftp_filer.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. // sftp_filer_refactored.go
  2. package sftpd
  3. import (
  4. "context"
  5. "crypto/md5"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "os"
  11. "path"
  12. "strings"
  13. "time"
  14. "github.com/pkg/sftp"
  15. "github.com/seaweedfs/seaweedfs/weed/glog"
  16. "github.com/seaweedfs/seaweedfs/weed/pb"
  17. filer_pb "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  18. weed_server "github.com/seaweedfs/seaweedfs/weed/server"
  19. "github.com/seaweedfs/seaweedfs/weed/sftpd/user"
  20. "github.com/seaweedfs/seaweedfs/weed/util"
  21. "google.golang.org/grpc"
  22. )
  23. const (
  24. defaultTimeout = 30 * time.Second
  25. defaultListLimit = 1000
  26. )
  27. // ==================== Filer RPC Helpers ====================
  28. // callWithClient wraps a gRPC client call with timeout and client creation.
  29. func (fs *SftpServer) callWithClient(streaming bool, fn func(ctx context.Context, client filer_pb.SeaweedFilerClient) error) error {
  30. return fs.withTimeoutContext(func(ctx context.Context) error {
  31. return fs.WithFilerClient(streaming, func(client filer_pb.SeaweedFilerClient) error {
  32. return fn(ctx, client)
  33. })
  34. })
  35. }
  36. // getEntry retrieves a single directory entry by path.
  37. func (fs *SftpServer) getEntry(p string) (*filer_pb.Entry, error) {
  38. dir, name := util.FullPath(p).DirAndName()
  39. var entry *filer_pb.Entry
  40. err := fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  41. r, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{Directory: dir, Name: name})
  42. if err != nil {
  43. if isNotExistError(err) {
  44. return os.ErrNotExist
  45. }
  46. return err
  47. }
  48. if r.Entry == nil {
  49. return fmt.Errorf("%s not found in %s", name, dir)
  50. }
  51. entry = r.Entry
  52. return nil
  53. })
  54. if err != nil {
  55. if isNotExistError(err) {
  56. return nil, os.ErrNotExist
  57. }
  58. return nil, fmt.Errorf("lookup %s: %w", p, err)
  59. }
  60. return entry, nil
  61. }
  62. func isNotExistError(err error) bool {
  63. return strings.Contains(err.Error(), "not found") ||
  64. strings.Contains(err.Error(), "no entry is found") ||
  65. strings.Contains(err.Error(), "file does not exist") ||
  66. err == os.ErrNotExist
  67. }
  68. // updateEntry sends an UpdateEntryRequest for the given entry.
  69. func (fs *SftpServer) updateEntry(dir string, entry *filer_pb.Entry) error {
  70. return fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  71. _, err := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{Directory: dir, Entry: entry})
  72. return err
  73. })
  74. }
  75. // ==================== FilerClient Interface ====================
  76. func (fs *SftpServer) AdjustedUrl(location *filer_pb.Location) string { return location.Url }
  77. func (fs *SftpServer) GetDataCenter() string { return fs.dataCenter }
  78. func (fs *SftpServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  79. addr := fs.filerAddr.ToGrpcAddress()
  80. return pb.WithGrpcClient(streamingMode, util.RandomInt32(), func(conn *grpc.ClientConn) error {
  81. return fn(filer_pb.NewSeaweedFilerClient(conn))
  82. }, addr, false, fs.grpcDialOption)
  83. }
  84. func (fs *SftpServer) withTimeoutContext(fn func(ctx context.Context) error) error {
  85. ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
  86. defer cancel()
  87. return fn(ctx)
  88. }
  89. // ==================== Command Dispatcher ====================
  90. func (fs *SftpServer) dispatchCmd(r *sftp.Request) error {
  91. glog.V(0).Infof("Dispatch: %s %s", r.Method, r.Filepath)
  92. switch r.Method {
  93. case "Remove":
  94. return fs.removeEntry(r)
  95. case "Rename":
  96. return fs.renameEntry(r)
  97. case "Mkdir":
  98. return fs.makeDir(r)
  99. case "Rmdir":
  100. return fs.removeDir(r)
  101. case "Setstat":
  102. return fs.setFileStat(r)
  103. default:
  104. return fmt.Errorf("unsupported: %s", r.Method)
  105. }
  106. }
  107. // ==================== File Operations ====================
  108. func (fs *SftpServer) readFile(r *sftp.Request) (io.ReaderAt, error) {
  109. if err := fs.checkFilePermission(r.Filepath, "read"); err != nil {
  110. return nil, err
  111. }
  112. entry, err := fs.getEntry(r.Filepath)
  113. if err != nil {
  114. return nil, err
  115. }
  116. return NewSeaweedFileReaderAt(fs, entry), nil
  117. }
  118. func (fs *SftpServer) newFileWriter(r *sftp.Request) (io.WriterAt, error) {
  119. dir, _ := util.FullPath(r.Filepath).DirAndName()
  120. if err := fs.checkFilePermission(dir, "write"); err != nil {
  121. glog.Errorf("Permission denied for %s", dir)
  122. return nil, err
  123. }
  124. // Create a temporary file to buffer writes
  125. tmpFile, err := os.CreateTemp("", "sftp-upload-*")
  126. if err != nil {
  127. return nil, fmt.Errorf("failed to create temp file: %w", err)
  128. }
  129. return &SeaweedSftpFileWriter{
  130. fs: *fs,
  131. req: r,
  132. tmpFile: tmpFile,
  133. permissions: 0644,
  134. uid: fs.user.Uid,
  135. gid: fs.user.Gid,
  136. offset: 0,
  137. }, nil
  138. }
  139. func (fs *SftpServer) removeEntry(r *sftp.Request) error {
  140. return fs.deleteEntry(r.Filepath, false)
  141. }
  142. func (fs *SftpServer) renameEntry(r *sftp.Request) error {
  143. if err := fs.checkFilePermission(r.Filepath, "rename"); err != nil {
  144. return err
  145. }
  146. oldDir, oldName := util.FullPath(r.Filepath).DirAndName()
  147. newDir, newName := util.FullPath(r.Target).DirAndName()
  148. return fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  149. _, err := client.AtomicRenameEntry(ctx, &filer_pb.AtomicRenameEntryRequest{
  150. OldDirectory: oldDir, OldName: oldName,
  151. NewDirectory: newDir, NewName: newName,
  152. })
  153. return err
  154. })
  155. }
  156. func (fs *SftpServer) setFileStat(r *sftp.Request) error {
  157. if err := fs.checkFilePermission(r.Filepath, "write"); err != nil {
  158. return err
  159. }
  160. entry, err := fs.getEntry(r.Filepath)
  161. if err != nil {
  162. return err
  163. }
  164. dir, _ := util.FullPath(r.Filepath).DirAndName()
  165. // apply attrs
  166. if r.AttrFlags().Permissions {
  167. entry.Attributes.FileMode = uint32(r.Attributes().FileMode())
  168. }
  169. if r.AttrFlags().UidGid {
  170. entry.Attributes.Uid = uint32(r.Attributes().UID)
  171. entry.Attributes.Gid = uint32(r.Attributes().GID)
  172. }
  173. if r.AttrFlags().Acmodtime {
  174. entry.Attributes.Mtime = int64(r.Attributes().Mtime)
  175. }
  176. if r.AttrFlags().Size {
  177. entry.Attributes.FileSize = uint64(r.Attributes().Size)
  178. }
  179. return fs.updateEntry(dir, entry)
  180. }
  181. // ==================== Directory Operations ====================
  182. func (fs *SftpServer) listDir(r *sftp.Request) (sftp.ListerAt, error) {
  183. if err := fs.checkFilePermission(r.Filepath, "list"); err != nil {
  184. return nil, err
  185. }
  186. if r.Method == "Stat" || r.Method == "Lstat" {
  187. entry, err := fs.getEntry(r.Filepath)
  188. if err != nil {
  189. return nil, err
  190. }
  191. fi := &EnhancedFileInfo{FileInfo: FileInfoFromEntry(entry), uid: entry.Attributes.Uid, gid: entry.Attributes.Gid}
  192. return listerat([]os.FileInfo{fi}), nil
  193. }
  194. return fs.listAllPages(r.Filepath)
  195. }
  196. func (fs *SftpServer) listAllPages(dirPath string) (sftp.ListerAt, error) {
  197. var all []os.FileInfo
  198. last := ""
  199. for {
  200. page, err := fs.fetchDirectoryPage(dirPath, last)
  201. if err != nil {
  202. return nil, err
  203. }
  204. all = append(all, page...)
  205. if len(page) < defaultListLimit {
  206. break
  207. }
  208. last = page[len(page)-1].Name()
  209. }
  210. return listerat(all), nil
  211. }
  212. func (fs *SftpServer) fetchDirectoryPage(dirPath, start string) ([]os.FileInfo, error) {
  213. var list []os.FileInfo
  214. err := fs.callWithClient(true, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  215. stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{Directory: dirPath, StartFromFileName: start, Limit: defaultListLimit})
  216. if err != nil {
  217. return err
  218. }
  219. for {
  220. r, err := stream.Recv()
  221. if err == io.EOF {
  222. break
  223. }
  224. if err != nil || r.Entry == nil {
  225. continue
  226. }
  227. p := path.Join(dirPath, r.Entry.Name)
  228. if err := fs.checkFilePermission(p, "list"); err != nil {
  229. continue
  230. }
  231. list = append(list, &EnhancedFileInfo{FileInfo: FileInfoFromEntry(r.Entry), uid: r.Entry.Attributes.Uid, gid: r.Entry.Attributes.Gid})
  232. }
  233. return nil
  234. })
  235. return list, err
  236. }
  237. // makeDir creates a new directory with proper permissions.
  238. func (fs *SftpServer) makeDir(r *sftp.Request) error {
  239. if fs.user == nil {
  240. return fmt.Errorf("cannot create directory: no user info")
  241. }
  242. dir, name := util.FullPath(r.Filepath).DirAndName()
  243. if err := fs.checkFilePermission(r.Filepath, "mkdir"); err != nil {
  244. return err
  245. }
  246. // default mode and ownership
  247. err := filer_pb.Mkdir(context.Background(), fs, string(dir), name, func(entry *filer_pb.Entry) {
  248. mode := uint32(0755 | os.ModeDir)
  249. if strings.HasPrefix(r.Filepath, fs.user.HomeDir) {
  250. mode = uint32(0700 | os.ModeDir)
  251. }
  252. entry.Attributes.FileMode = mode
  253. entry.Attributes.Uid = fs.user.Uid
  254. entry.Attributes.Gid = fs.user.Gid
  255. now := time.Now().Unix()
  256. entry.Attributes.Crtime = now
  257. entry.Attributes.Mtime = now
  258. if entry.Extended == nil {
  259. entry.Extended = make(map[string][]byte)
  260. }
  261. entry.Extended["creator"] = []byte(fs.user.Username)
  262. })
  263. return err
  264. }
  265. // removeDir deletes a directory.
  266. func (fs *SftpServer) removeDir(r *sftp.Request) error {
  267. return fs.deleteEntry(r.Filepath, false)
  268. }
  269. func (fs *SftpServer) putFile(filepath string, reader io.Reader, user *user.User) error {
  270. dir, filename := util.FullPath(filepath).DirAndName()
  271. uploadUrl := fmt.Sprintf("http://%s%s", fs.filerAddr, filepath)
  272. // Compute MD5 while uploading
  273. hash := md5.New()
  274. body := io.TeeReader(reader, hash)
  275. // We can skip ContentLength if unknown (chunked transfer encoding)
  276. req, err := http.NewRequest(http.MethodPut, uploadUrl, body)
  277. if err != nil {
  278. return fmt.Errorf("create request: %w", err)
  279. }
  280. req.Header.Set("Content-Type", "application/octet-stream")
  281. resp, err := http.DefaultClient.Do(req)
  282. if err != nil {
  283. return fmt.Errorf("upload to filer: %w", err)
  284. }
  285. defer resp.Body.Close()
  286. respBody, err := io.ReadAll(resp.Body)
  287. if err != nil {
  288. return fmt.Errorf("read response: %w", err)
  289. }
  290. if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
  291. return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(respBody))
  292. }
  293. var result weed_server.FilerPostResult
  294. if err := json.Unmarshal(respBody, &result); err != nil {
  295. return fmt.Errorf("parse response: %w", err)
  296. }
  297. if result.Error != "" {
  298. return fmt.Errorf("filer error: %s", result.Error)
  299. }
  300. // Update file ownership using the same pattern as other functions
  301. if user != nil {
  302. err := fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  303. // Look up the file to get its current entry
  304. lookupResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
  305. Directory: dir,
  306. Name: filename,
  307. })
  308. if err != nil {
  309. return fmt.Errorf("lookup file for attribute update: %w", err)
  310. }
  311. if lookupResp.Entry == nil {
  312. return fmt.Errorf("file not found after upload: %s/%s", dir, filename)
  313. }
  314. // Update the entry with new uid/gid
  315. entry := lookupResp.Entry
  316. entry.Attributes.Uid = user.Uid
  317. entry.Attributes.Gid = user.Gid
  318. // Update the entry in the filer
  319. _, err = client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{
  320. Directory: dir,
  321. Entry: entry,
  322. })
  323. return err
  324. })
  325. if err != nil {
  326. // Log the error but don't fail the whole operation
  327. glog.Errorf("Failed to update file ownership for %s: %v", filepath, err)
  328. }
  329. }
  330. return nil
  331. }
  332. // ==================== Common Arguments Helpers ====================
  333. func FileInfoFromEntry(e *filer_pb.Entry) FileInfo {
  334. return FileInfo{name: e.Name, size: int64(e.Attributes.FileSize), mode: os.FileMode(e.Attributes.FileMode), modTime: time.Unix(e.Attributes.Mtime, 0), isDir: e.IsDirectory}
  335. }
  336. func (fs *SftpServer) deleteEntry(p string, recursive bool) error {
  337. if err := fs.checkFilePermission(p, "delete"); err != nil {
  338. return err
  339. }
  340. dir, name := util.FullPath(p).DirAndName()
  341. return fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  342. r, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{Directory: dir, Name: name, IsDeleteData: true, IsRecursive: recursive})
  343. if err != nil {
  344. return err
  345. }
  346. if r.Error != "" {
  347. return fmt.Errorf("%s", r.Error)
  348. }
  349. return nil
  350. })
  351. }
  352. // ==================== Custom Types ====================
  353. type EnhancedFileInfo struct {
  354. FileInfo
  355. uid uint32
  356. gid uint32
  357. }
  358. // FileStat represents file statistics in a platform-independent way
  359. type FileStat struct {
  360. Uid uint32
  361. Gid uint32
  362. }
  363. func (fi *EnhancedFileInfo) Sys() interface{} {
  364. return &FileStat{Uid: fi.uid, Gid: fi.gid}
  365. }
  366. func (fi *EnhancedFileInfo) Owner() (uid, gid int) {
  367. return int(fi.uid), int(fi.gid)
  368. }
  369. func (fs *SftpServer) checkFilePermission(filepath string, permissions string) error {
  370. return fs.CheckFilePermission(filepath, permissions)
  371. }