webdav_server.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path"
  8. "strings"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/util/version"
  11. "github.com/seaweedfs/seaweedfs/weed/util/buffered_writer"
  12. "golang.org/x/net/webdav"
  13. "google.golang.org/grpc"
  14. "github.com/seaweedfs/seaweedfs/weed/operation"
  15. "github.com/seaweedfs/seaweedfs/weed/pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/util"
  18. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  19. "github.com/seaweedfs/seaweedfs/weed/filer"
  20. "github.com/seaweedfs/seaweedfs/weed/glog"
  21. "github.com/seaweedfs/seaweedfs/weed/security"
  22. )
  23. type WebDavOption struct {
  24. Filer pb.ServerAddress
  25. FilerRootPath string
  26. DomainName string
  27. BucketsPath string
  28. GrpcDialOption grpc.DialOption
  29. Collection string
  30. Replication string
  31. DiskType string
  32. Uid uint32
  33. Gid uint32
  34. Cipher bool
  35. CacheDir string
  36. CacheSizeMB int64
  37. MaxMB int
  38. }
  39. type WebDavServer struct {
  40. option *WebDavOption
  41. secret security.SigningKey
  42. filer *filer.Filer
  43. grpcDialOption grpc.DialOption
  44. Handler *webdav.Handler
  45. }
  46. func max(x, y int64) int64 {
  47. if x <= y {
  48. return y
  49. }
  50. return x
  51. }
  52. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  53. fs, _ := NewWebDavFileSystem(option)
  54. // Fix no set filer.path , accessing "/" returns "//"
  55. if option.FilerRootPath == "/" {
  56. option.FilerRootPath = ""
  57. }
  58. // filer.path non "/" option means we are accessing filer's sub-folders
  59. if option.FilerRootPath != "" {
  60. fs = NewWrappedFs(fs, path.Clean(option.FilerRootPath))
  61. }
  62. ws = &WebDavServer{
  63. option: option,
  64. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  65. Handler: &webdav.Handler{
  66. FileSystem: fs,
  67. LockSystem: webdav.NewMemLS(),
  68. },
  69. }
  70. return ws, nil
  71. }
  72. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  73. type WebDavFileSystem struct {
  74. option *WebDavOption
  75. secret security.SigningKey
  76. grpcDialOption grpc.DialOption
  77. chunkCache *chunk_cache.TieredChunkCache
  78. readerCache *filer.ReaderCache
  79. signature int32
  80. }
  81. type FileInfo struct {
  82. name string
  83. size int64
  84. mode os.FileMode
  85. modifiedTime time.Time
  86. etag string
  87. isDirectory bool
  88. err error
  89. }
  90. func (fi *FileInfo) Name() string { return fi.name }
  91. func (fi *FileInfo) Size() int64 { return fi.size }
  92. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  93. func (fi *FileInfo) ModTime() time.Time { return fi.modifiedTime }
  94. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  95. func (fi *FileInfo) Sys() interface{} { return nil }
  96. func (fi *FileInfo) ETag(ctx context.Context) (string, error) {
  97. if fi.err != nil {
  98. return "", fi.err
  99. }
  100. return fi.etag, nil
  101. }
  102. type WebDavFile struct {
  103. fs *WebDavFileSystem
  104. name string
  105. isDirectory bool
  106. off int64
  107. entry *filer_pb.Entry
  108. visibleIntervals *filer.IntervalList[*filer.VisibleInterval]
  109. reader io.ReaderAt
  110. bufWriter *buffered_writer.BufferedWriteCloser
  111. ctx context.Context
  112. }
  113. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  114. cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + version.Version()))[0:8]
  115. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  116. os.MkdirAll(cacheDir, os.FileMode(0755))
  117. chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  118. t := &WebDavFileSystem{
  119. option: option,
  120. chunkCache: chunkCache,
  121. signature: util.RandomInt32(),
  122. }
  123. t.readerCache = filer.NewReaderCache(32, chunkCache, filer.LookupFn(t))
  124. return t, nil
  125. }
  126. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  127. func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  128. return pb.WithGrpcClient(streamingMode, fs.signature, func(grpcConnection *grpc.ClientConn) error {
  129. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  130. return fn(client)
  131. }, fs.option.Filer.ToGrpcAddress(), false, fs.option.GrpcDialOption)
  132. }
  133. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  134. return location.Url
  135. }
  136. func (fs *WebDavFileSystem) GetDataCenter() string {
  137. return ""
  138. }
  139. func clearName(name string) (string, error) {
  140. slashed := strings.HasSuffix(name, "/")
  141. name = path.Clean(name)
  142. if !strings.HasSuffix(name, "/") && slashed {
  143. name += "/"
  144. }
  145. if !strings.HasPrefix(name, "/") {
  146. return "", os.ErrInvalid
  147. }
  148. return name, nil
  149. }
  150. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  151. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  152. if !strings.HasSuffix(fullDirPath, "/") {
  153. fullDirPath += "/"
  154. }
  155. var err error
  156. if fullDirPath, err = clearName(fullDirPath); err != nil {
  157. return err
  158. }
  159. _, err = fs.stat(ctx, fullDirPath)
  160. if err == nil {
  161. return os.ErrExist
  162. }
  163. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  164. dir, name := util.FullPath(fullDirPath).DirAndName()
  165. request := &filer_pb.CreateEntryRequest{
  166. Directory: dir,
  167. Entry: &filer_pb.Entry{
  168. Name: name,
  169. IsDirectory: true,
  170. Attributes: &filer_pb.FuseAttributes{
  171. Mtime: time.Now().Unix(),
  172. Crtime: time.Now().Unix(),
  173. FileMode: uint32(perm | os.ModeDir),
  174. Uid: fs.option.Uid,
  175. Gid: fs.option.Gid,
  176. },
  177. },
  178. Signatures: []int32{fs.signature},
  179. }
  180. glog.V(1).Infof("mkdir: %v", request)
  181. if err := filer_pb.CreateEntry(context.Background(), client, request); err != nil {
  182. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  183. }
  184. return nil
  185. })
  186. }
  187. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  188. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  189. var err error
  190. if fullFilePath, err = clearName(fullFilePath); err != nil {
  191. return nil, err
  192. }
  193. if flag&os.O_CREATE != 0 {
  194. // file should not have / suffix.
  195. if strings.HasSuffix(fullFilePath, "/") {
  196. return nil, os.ErrInvalid
  197. }
  198. _, err = fs.stat(ctx, fullFilePath)
  199. if err == nil {
  200. if flag&os.O_EXCL != 0 {
  201. return nil, os.ErrExist
  202. }
  203. fs.removeAll(ctx, fullFilePath)
  204. }
  205. dir, name := util.FullPath(fullFilePath).DirAndName()
  206. err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  207. if err := filer_pb.CreateEntry(context.Background(), client, &filer_pb.CreateEntryRequest{
  208. Directory: dir,
  209. Entry: &filer_pb.Entry{
  210. Name: name,
  211. IsDirectory: perm&os.ModeDir > 0,
  212. Attributes: &filer_pb.FuseAttributes{
  213. Mtime: 0,
  214. Crtime: time.Now().Unix(),
  215. FileMode: uint32(perm),
  216. Uid: fs.option.Uid,
  217. Gid: fs.option.Gid,
  218. TtlSec: 0,
  219. },
  220. },
  221. Signatures: []int32{fs.signature},
  222. }); err != nil {
  223. return fmt.Errorf("create %s: %v", fullFilePath, err)
  224. }
  225. return nil
  226. })
  227. if err != nil {
  228. return nil, err
  229. }
  230. return &WebDavFile{
  231. fs: fs,
  232. name: fullFilePath,
  233. isDirectory: false,
  234. bufWriter: buffered_writer.NewBufferedWriteCloser(fs.option.MaxMB * 1024 * 1024),
  235. ctx: ctx,
  236. }, nil
  237. }
  238. fi, err := fs.stat(ctx, fullFilePath)
  239. if err != nil {
  240. if err == os.ErrNotExist {
  241. return nil, err
  242. }
  243. return &WebDavFile{fs: fs, ctx: ctx}, nil
  244. }
  245. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  246. fullFilePath += "/"
  247. }
  248. return &WebDavFile{
  249. fs: fs,
  250. name: fullFilePath,
  251. isDirectory: false,
  252. bufWriter: buffered_writer.NewBufferedWriteCloser(fs.option.MaxMB * 1024 * 1024),
  253. ctx: ctx,
  254. }, nil
  255. }
  256. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  257. var err error
  258. if fullFilePath, err = clearName(fullFilePath); err != nil {
  259. return err
  260. }
  261. dir, name := util.FullPath(fullFilePath).DirAndName()
  262. return filer_pb.Remove(context.Background(), fs, dir, name, true, false, false, false, []int32{fs.signature})
  263. }
  264. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  265. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  266. return fs.removeAll(ctx, name)
  267. }
  268. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  269. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  270. var err error
  271. if oldName, err = clearName(oldName); err != nil {
  272. return err
  273. }
  274. if newName, err = clearName(newName); err != nil {
  275. return err
  276. }
  277. of, err := fs.stat(ctx, oldName)
  278. if err != nil {
  279. return os.ErrExist
  280. }
  281. if of.IsDir() {
  282. if strings.HasSuffix(oldName, "/") {
  283. oldName = strings.TrimRight(oldName, "/")
  284. }
  285. if strings.HasSuffix(newName, "/") {
  286. newName = strings.TrimRight(newName, "/")
  287. }
  288. }
  289. _, err = fs.stat(ctx, newName)
  290. if err == nil {
  291. return os.ErrExist
  292. }
  293. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  294. newDir, newBaseName := util.FullPath(newName).DirAndName()
  295. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  296. request := &filer_pb.AtomicRenameEntryRequest{
  297. OldDirectory: oldDir,
  298. OldName: oldBaseName,
  299. NewDirectory: newDir,
  300. NewName: newBaseName,
  301. }
  302. _, err := client.AtomicRenameEntry(ctx, request)
  303. if err != nil {
  304. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  305. }
  306. return nil
  307. })
  308. }
  309. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  310. var err error
  311. if fullFilePath, err = clearName(fullFilePath); err != nil {
  312. return nil, err
  313. }
  314. fullpath := util.FullPath(fullFilePath)
  315. var fi FileInfo
  316. entry, err := filer_pb.GetEntry(context.Background(), fs, fullpath)
  317. if err != nil {
  318. if err == filer_pb.ErrNotFound {
  319. return nil, os.ErrNotExist
  320. }
  321. fi.err = err
  322. return &fi, nil
  323. }
  324. if entry == nil {
  325. return nil, os.ErrNotExist
  326. }
  327. fi.size = int64(filer.FileSize(entry))
  328. fi.name = string(fullpath)
  329. fi.mode = os.FileMode(entry.Attributes.FileMode)
  330. fi.modifiedTime = time.Unix(entry.Attributes.Mtime, 0)
  331. fi.etag = filer.ETag(entry)
  332. fi.isDirectory = entry.IsDirectory
  333. if fi.name == "/" {
  334. fi.modifiedTime = time.Now()
  335. fi.isDirectory = true
  336. }
  337. return &fi, nil
  338. }
  339. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  340. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  341. return fs.stat(ctx, name)
  342. }
  343. func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
  344. uploader, uploaderErr := operation.NewUploader()
  345. if uploaderErr != nil {
  346. glog.V(0).Infof("upload data %v: %v", f.name, uploaderErr)
  347. return nil, fmt.Errorf("upload data: %w", uploaderErr)
  348. }
  349. fileId, uploadResult, flushErr, _ := uploader.UploadWithRetry(
  350. f.fs,
  351. &filer_pb.AssignVolumeRequest{
  352. Count: 1,
  353. Replication: f.fs.option.Replication,
  354. Collection: f.fs.option.Collection,
  355. DiskType: f.fs.option.DiskType,
  356. Path: name,
  357. },
  358. &operation.UploadOption{
  359. Filename: f.name,
  360. Cipher: f.fs.option.Cipher,
  361. IsInputCompressed: false,
  362. MimeType: "",
  363. PairMap: nil,
  364. },
  365. func(host, fileId string) string {
  366. return fmt.Sprintf("http://%s/%s", host, fileId)
  367. },
  368. reader,
  369. )
  370. if flushErr != nil {
  371. glog.V(0).Infof("upload data %v: %v", f.name, flushErr)
  372. return nil, fmt.Errorf("upload data: %w", flushErr)
  373. }
  374. if uploadResult.Error != "" {
  375. glog.V(0).Infof("upload failure %v: %v", f.name, flushErr)
  376. return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
  377. }
  378. return uploadResult.ToPbFileChunk(fileId, offset, tsNs), nil
  379. }
  380. func (f *WebDavFile) Write(buf []byte) (int, error) {
  381. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  382. fullPath := util.FullPath(f.name)
  383. dir, _ := fullPath.DirAndName()
  384. var getErr error
  385. ctx := context.Background()
  386. if f.entry == nil {
  387. f.entry, getErr = filer_pb.GetEntry(context.Background(), f.fs, fullPath)
  388. }
  389. if f.entry == nil {
  390. return 0, getErr
  391. }
  392. if getErr != nil {
  393. return 0, getErr
  394. }
  395. if f.bufWriter.FlushFunc == nil {
  396. f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
  397. var chunk *filer_pb.FileChunk
  398. chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset, time.Now().UnixNano())
  399. if flushErr != nil {
  400. if f.entry.Attributes.Mtime == 0 {
  401. if err := f.fs.removeAll(ctx, f.name); err != nil {
  402. glog.Errorf("bufWriter.Flush remove file error: %+v", f.name)
  403. }
  404. }
  405. return fmt.Errorf("%s upload result: %v", f.name, flushErr)
  406. }
  407. f.entry.Content = nil
  408. f.entry.Chunks = append(f.entry.GetChunks(), chunk)
  409. return flushErr
  410. }
  411. f.bufWriter.CloseFunc = func() error {
  412. manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.GetChunks())
  413. if manifestErr != nil {
  414. // not good, but should be ok
  415. glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
  416. } else {
  417. f.entry.Chunks = manifestedChunks
  418. }
  419. flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  420. f.entry.Attributes.Mtime = time.Now().Unix()
  421. request := &filer_pb.UpdateEntryRequest{
  422. Directory: dir,
  423. Entry: f.entry,
  424. Signatures: []int32{f.fs.signature},
  425. }
  426. if _, err := client.UpdateEntry(ctx, request); err != nil {
  427. return fmt.Errorf("update %s: %v", f.name, err)
  428. }
  429. return nil
  430. })
  431. return flushErr
  432. }
  433. }
  434. written, err := f.bufWriter.Write(buf)
  435. if err == nil {
  436. f.entry.Attributes.FileSize = uint64(max(f.off+int64(written), int64(f.entry.Attributes.FileSize)))
  437. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  438. f.off += int64(written)
  439. }
  440. return written, err
  441. }
  442. func (f *WebDavFile) Close() error {
  443. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  444. if f.bufWriter == nil {
  445. return nil
  446. }
  447. err := f.bufWriter.Close()
  448. if f.entry != nil {
  449. f.entry = nil
  450. f.visibleIntervals = nil
  451. }
  452. return err
  453. }
  454. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  455. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  456. if f.entry == nil {
  457. f.entry, err = filer_pb.GetEntry(context.Background(), f.fs, util.FullPath(f.name))
  458. }
  459. if f.entry == nil {
  460. return 0, err
  461. }
  462. if err != nil {
  463. return 0, err
  464. }
  465. fileSize := int64(filer.FileSize(f.entry))
  466. if fileSize == 0 {
  467. return 0, io.EOF
  468. }
  469. if f.visibleIntervals == nil {
  470. f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(f.ctx, filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
  471. f.reader = nil
  472. }
  473. if f.reader == nil {
  474. chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize)
  475. f.reader = filer.NewChunkReaderAtFromClient(f.ctx, f.fs.readerCache, chunkViews, fileSize)
  476. }
  477. readSize, err = f.reader.ReadAt(p, f.off)
  478. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  479. f.off += int64(readSize)
  480. if err != nil && err != io.EOF {
  481. glog.Errorf("file read %s: %v", f.name, err)
  482. }
  483. return
  484. }
  485. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  486. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  487. dir, _ := util.FullPath(f.name).DirAndName()
  488. err = filer_pb.ReadDirAllEntries(context.Background(), f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  489. fi := FileInfo{
  490. size: int64(filer.FileSize(entry)),
  491. name: entry.Name,
  492. mode: os.FileMode(entry.Attributes.FileMode),
  493. modifiedTime: time.Unix(entry.Attributes.Mtime, 0),
  494. isDirectory: entry.IsDirectory,
  495. }
  496. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  497. fi.name += "/"
  498. }
  499. glog.V(4).Infof("entry: %v", fi.name)
  500. ret = append(ret, &fi)
  501. return nil
  502. })
  503. if err != nil {
  504. return nil, err
  505. }
  506. old := f.off
  507. if old >= int64(len(ret)) {
  508. if count > 0 {
  509. return nil, io.EOF
  510. }
  511. return nil, nil
  512. }
  513. if count > 0 {
  514. f.off += int64(count)
  515. if f.off > int64(len(ret)) {
  516. f.off = int64(len(ret))
  517. }
  518. } else {
  519. f.off = int64(len(ret))
  520. old = 0
  521. }
  522. return ret[old:f.off], nil
  523. }
  524. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  525. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  526. ctx := context.Background()
  527. var err error
  528. switch whence {
  529. case io.SeekStart:
  530. f.off = 0
  531. case io.SeekEnd:
  532. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  533. return 0, err
  534. } else {
  535. f.off = fi.Size()
  536. }
  537. }
  538. f.off += offset
  539. return f.off, err
  540. }
  541. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  542. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  543. ctx := context.Background()
  544. return f.fs.stat(ctx, f.name)
  545. }