filer_server_handlers_read.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package weed_server
  2. import (
  3. "encoding/base64"
  4. "encoding/hex"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "math"
  9. "mime"
  10. "net/http"
  11. "path/filepath"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  16. "github.com/seaweedfs/seaweedfs/weed/security"
  17. "github.com/seaweedfs/seaweedfs/weed/filer"
  18. "github.com/seaweedfs/seaweedfs/weed/glog"
  19. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  20. "github.com/seaweedfs/seaweedfs/weed/stats"
  21. "github.com/seaweedfs/seaweedfs/weed/util"
  22. )
  23. // Validates the preconditions. Returns true if GET/HEAD operation should not proceed.
  24. // Preconditions supported are:
  25. //
  26. // If-Modified-Since
  27. // If-Unmodified-Since
  28. // If-Match
  29. // If-None-Match
  30. func checkPreconditions(w http.ResponseWriter, r *http.Request, entry *filer.Entry) bool {
  31. etag := filer.ETagEntry(entry)
  32. /// When more than one conditional request header field is present in a
  33. /// request, the order in which the fields are evaluated becomes
  34. /// important. In practice, the fields defined in this document are
  35. /// consistently implemented in a single, logical order, since "lost
  36. /// update" preconditions have more strict requirements than cache
  37. /// validation, a validated cache is more efficient than a partial
  38. /// response, and entity tags are presumed to be more accurate than date
  39. /// validators. https://tools.ietf.org/html/rfc7232#section-5
  40. if entry.Attr.Mtime.IsZero() {
  41. return false
  42. }
  43. w.Header().Set("Last-Modified", entry.Attr.Mtime.UTC().Format(http.TimeFormat))
  44. ifMatchETagHeader := r.Header.Get("If-Match")
  45. ifUnmodifiedSinceHeader := r.Header.Get("If-Unmodified-Since")
  46. if ifMatchETagHeader != "" {
  47. if util.CanonicalizeETag(etag) != util.CanonicalizeETag(ifMatchETagHeader) {
  48. w.WriteHeader(http.StatusPreconditionFailed)
  49. return true
  50. }
  51. } else if ifUnmodifiedSinceHeader != "" {
  52. if t, parseError := time.Parse(http.TimeFormat, ifUnmodifiedSinceHeader); parseError == nil {
  53. if t.Before(entry.Attr.Mtime) {
  54. w.WriteHeader(http.StatusPreconditionFailed)
  55. return true
  56. }
  57. }
  58. }
  59. ifNoneMatchETagHeader := r.Header.Get("If-None-Match")
  60. ifModifiedSinceHeader := r.Header.Get("If-Modified-Since")
  61. if ifNoneMatchETagHeader != "" {
  62. if util.CanonicalizeETag(etag) == util.CanonicalizeETag(ifNoneMatchETagHeader) {
  63. SetEtag(w, etag)
  64. w.WriteHeader(http.StatusNotModified)
  65. return true
  66. }
  67. } else if ifModifiedSinceHeader != "" {
  68. if t, parseError := time.Parse(http.TimeFormat, ifModifiedSinceHeader); parseError == nil {
  69. if !t.Before(entry.Attr.Mtime) {
  70. SetEtag(w, etag)
  71. w.WriteHeader(http.StatusNotModified)
  72. return true
  73. }
  74. }
  75. }
  76. return false
  77. }
  78. func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
  79. ctx := r.Context()
  80. path := r.URL.Path
  81. isForDirectory := strings.HasSuffix(path, "/")
  82. if isForDirectory && len(path) > 1 {
  83. path = path[:len(path)-1]
  84. }
  85. entry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
  86. if err != nil {
  87. if path == "/" {
  88. fs.listDirectoryHandler(w, r)
  89. return
  90. }
  91. if err == filer_pb.ErrNotFound {
  92. glog.V(2).InfofCtx(ctx, "Not found %s: %v", path, err)
  93. stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadNotFound).Inc()
  94. w.WriteHeader(http.StatusNotFound)
  95. } else {
  96. glog.ErrorfCtx(ctx, "Internal %s: %v", path, err)
  97. stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadInternal).Inc()
  98. w.WriteHeader(http.StatusInternalServerError)
  99. }
  100. return
  101. }
  102. query := r.URL.Query()
  103. if entry.IsDirectory() {
  104. if fs.option.DisableDirListing {
  105. w.WriteHeader(http.StatusForbidden)
  106. return
  107. }
  108. if query.Get("metadata") == "true" {
  109. writeJsonQuiet(w, r, http.StatusOK, entry)
  110. return
  111. }
  112. if entry.Attr.Mime == "" || (entry.Attr.Mime == s3_constants.FolderMimeType && r.Header.Get(s3_constants.AmzIdentityId) == "") {
  113. // Don't return directory meta if config value is set to true
  114. if fs.option.ExposeDirectoryData == false {
  115. writeJsonError(w, r, http.StatusForbidden, errors.New("directory listing is disabled"))
  116. return
  117. }
  118. // return index of directory for non s3 gateway
  119. fs.listDirectoryHandler(w, r)
  120. return
  121. }
  122. // inform S3 API this is a user created directory key object
  123. w.Header().Set(s3_constants.SeaweedFSIsDirectoryKey, "true")
  124. }
  125. if isForDirectory && entry.Attr.Mime != s3_constants.FolderMimeType {
  126. w.WriteHeader(http.StatusNotFound)
  127. return
  128. }
  129. if query.Get("metadata") == "true" {
  130. if query.Get("resolveManifest") == "true" {
  131. if entry.Chunks, _, err = filer.ResolveChunkManifest(
  132. ctx,
  133. fs.filer.MasterClient.GetLookupFileIdFunction(),
  134. entry.GetChunks(), 0, math.MaxInt64); err != nil {
  135. err = fmt.Errorf("failed to resolve chunk manifest, err: %s", err.Error())
  136. writeJsonError(w, r, http.StatusInternalServerError, err)
  137. return
  138. }
  139. }
  140. writeJsonQuiet(w, r, http.StatusOK, entry)
  141. return
  142. }
  143. if checkPreconditions(w, r, entry) {
  144. return
  145. }
  146. var etag string
  147. if partNumber, errNum := strconv.Atoi(r.Header.Get(s3_constants.SeaweedFSPartNumber)); errNum == nil {
  148. if len(entry.Chunks) < partNumber {
  149. stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadChunk).Inc()
  150. w.WriteHeader(http.StatusBadRequest)
  151. w.Write([]byte("InvalidPart"))
  152. return
  153. }
  154. w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(len(entry.Chunks)))
  155. partChunk := entry.GetChunks()[partNumber-1]
  156. md5, _ := base64.StdEncoding.DecodeString(partChunk.ETag)
  157. etag = hex.EncodeToString(md5)
  158. r.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", partChunk.Offset, uint64(partChunk.Offset)+partChunk.Size-1))
  159. } else {
  160. etag = filer.ETagEntry(entry)
  161. }
  162. w.Header().Set("Accept-Ranges", "bytes")
  163. // mime type
  164. mimeType := entry.Attr.Mime
  165. if mimeType == "" {
  166. if ext := filepath.Ext(entry.Name()); ext != "" {
  167. mimeType = mime.TypeByExtension(ext)
  168. }
  169. }
  170. if mimeType != "" {
  171. w.Header().Set("Content-Type", mimeType)
  172. } else {
  173. w.Header().Set("Content-Type", "application/octet-stream")
  174. }
  175. // print out the header from extended properties
  176. for k, v := range entry.Extended {
  177. if !strings.HasPrefix(k, "xattr-") && !strings.HasPrefix(k, "x-seaweedfs-") {
  178. // "xattr-" prefix is set in filesys.XATTR_PREFIX
  179. // "x-seaweedfs-" prefix is for internal metadata that should not become HTTP headers
  180. w.Header().Set(k, string(v))
  181. }
  182. }
  183. //Seaweed custom header are not visible to Vue or javascript
  184. seaweedHeaders := []string{}
  185. for header := range w.Header() {
  186. if strings.HasPrefix(header, "Seaweed-") {
  187. seaweedHeaders = append(seaweedHeaders, header)
  188. }
  189. }
  190. seaweedHeaders = append(seaweedHeaders, "Content-Disposition")
  191. w.Header().Set("Access-Control-Expose-Headers", strings.Join(seaweedHeaders, ","))
  192. //set tag count
  193. tagCount := 0
  194. for k := range entry.Extended {
  195. if strings.HasPrefix(k, s3_constants.AmzObjectTagging+"-") {
  196. tagCount++
  197. }
  198. }
  199. if tagCount > 0 {
  200. w.Header().Set(s3_constants.AmzTagCount, strconv.Itoa(tagCount))
  201. }
  202. // Set SSE metadata headers for S3 API consumption
  203. if sseIV, exists := entry.Extended[s3_constants.SeaweedFSSSEIV]; exists {
  204. // Convert binary IV to base64 for HTTP header
  205. ivBase64 := base64.StdEncoding.EncodeToString(sseIV)
  206. w.Header().Set(s3_constants.SeaweedFSSSEIVHeader, ivBase64)
  207. }
  208. // Set SSE-C algorithm and key MD5 headers for S3 API response
  209. if sseAlgorithm, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; exists {
  210. w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, string(sseAlgorithm))
  211. }
  212. if sseKeyMD5, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; exists {
  213. w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, string(sseKeyMD5))
  214. }
  215. if sseKMSKey, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
  216. // Convert binary KMS metadata to base64 for HTTP header
  217. kmsBase64 := base64.StdEncoding.EncodeToString(sseKMSKey)
  218. w.Header().Set(s3_constants.SeaweedFSSSEKMSKeyHeader, kmsBase64)
  219. }
  220. SetEtag(w, etag)
  221. filename := entry.Name()
  222. AdjustPassthroughHeaders(w, r, filename)
  223. // For range processing, use the original content size, not the encrypted size
  224. // entry.Size() returns max(chunk_sizes, file_size) where chunk_sizes include encryption overhead
  225. // For SSE objects, we need the original unencrypted size for proper range validation
  226. totalSize := int64(entry.FileSize)
  227. if r.Method == http.MethodHead {
  228. w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
  229. return
  230. }
  231. ProcessRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) {
  232. if offset+size <= int64(len(entry.Content)) {
  233. return func(writer io.Writer) error {
  234. _, err := writer.Write(entry.Content[offset : offset+size])
  235. if err != nil {
  236. stats.FilerHandlerCounter.WithLabelValues(stats.ErrorWriteEntry).Inc()
  237. glog.ErrorfCtx(ctx, "failed to write entry content: %v", err)
  238. }
  239. return err
  240. }, nil
  241. }
  242. chunks := entry.GetChunks()
  243. if entry.IsInRemoteOnly() {
  244. dir, name := entry.FullPath.DirAndName()
  245. if resp, err := fs.CacheRemoteObjectToLocalCluster(ctx, &filer_pb.CacheRemoteObjectToLocalClusterRequest{
  246. Directory: dir,
  247. Name: name,
  248. }); err != nil {
  249. stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadCache).Inc()
  250. glog.ErrorfCtx(ctx, "CacheRemoteObjectToLocalCluster %s: %v", entry.FullPath, err)
  251. return nil, fmt.Errorf("cache %s: %v", entry.FullPath, err)
  252. } else {
  253. chunks = resp.Entry.GetChunks()
  254. }
  255. }
  256. streamFn, err := filer.PrepareStreamContentWithThrottler(ctx, fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs)
  257. if err != nil {
  258. stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
  259. glog.ErrorfCtx(ctx, "failed to prepare stream content %s: %v", r.URL, err)
  260. return nil, err
  261. }
  262. return func(writer io.Writer) error {
  263. err := streamFn(writer)
  264. if err != nil {
  265. stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
  266. glog.ErrorfCtx(ctx, "failed to stream content %s: %v", r.URL, err)
  267. }
  268. return err
  269. }, nil
  270. })
  271. }
  272. func (fs *FilerServer) maybeGetVolumeReadJwtAuthorizationToken(fileId string) string {
  273. return string(security.GenJwtForVolumeServer(fs.volumeGuard.ReadSigningKey, fs.volumeGuard.ReadExpiresAfterSec, fileId))
  274. }