s3api_object_handlers.go 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446
  1. package s3api
  2. import (
  3. "bytes"
  4. "encoding/base64"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/seaweedfs/seaweedfs/weed/filer"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  17. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  18. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  19. "github.com/seaweedfs/seaweedfs/weed/glog"
  20. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  21. )
  22. // corsHeaders defines the CORS headers that need to be preserved
  23. // Package-level constant to avoid repeated allocations
  24. var corsHeaders = []string{
  25. "Access-Control-Allow-Origin",
  26. "Access-Control-Allow-Methods",
  27. "Access-Control-Allow-Headers",
  28. "Access-Control-Expose-Headers",
  29. "Access-Control-Max-Age",
  30. "Access-Control-Allow-Credentials",
  31. }
  32. func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser {
  33. mimeBuffer := make([]byte, 512)
  34. size, _ := dataReader.Read(mimeBuffer)
  35. if size > 0 {
  36. r.Header.Set("Content-Type", http.DetectContentType(mimeBuffer[:size]))
  37. return io.NopCloser(io.MultiReader(bytes.NewReader(mimeBuffer[:size]), dataReader))
  38. }
  39. return io.NopCloser(dataReader)
  40. }
  41. func urlEscapeObject(object string) string {
  42. t := urlPathEscape(removeDuplicateSlashes(object))
  43. if strings.HasPrefix(t, "/") {
  44. return t
  45. }
  46. return "/" + t
  47. }
  48. func entryUrlEncode(dir string, entry string, encodingTypeUrl bool) (dirName string, entryName string, prefix string) {
  49. if !encodingTypeUrl {
  50. return dir, entry, entry
  51. }
  52. return urlPathEscape(dir), url.QueryEscape(entry), urlPathEscape(entry)
  53. }
  54. func urlPathEscape(object string) string {
  55. var escapedParts []string
  56. for _, part := range strings.Split(object, "/") {
  57. escapedParts = append(escapedParts, strings.ReplaceAll(url.PathEscape(part), "+", "%2B"))
  58. }
  59. return strings.Join(escapedParts, "/")
  60. }
  61. func removeDuplicateSlashes(object string) string {
  62. result := strings.Builder{}
  63. result.Grow(len(object))
  64. isLastSlash := false
  65. for _, r := range object {
  66. switch r {
  67. case '/':
  68. if !isLastSlash {
  69. result.WriteRune(r)
  70. }
  71. isLastSlash = true
  72. default:
  73. result.WriteRune(r)
  74. isLastSlash = false
  75. }
  76. }
  77. return result.String()
  78. }
  79. // checkDirectoryObject checks if the object is a directory object (ends with "/") and if it exists
  80. // Returns: (entry, isDirectoryObject, error)
  81. // - entry: the directory entry if found and is a directory
  82. // - isDirectoryObject: true if the request was for a directory object (ends with "/")
  83. // - error: any error encountered while checking
  84. func (s3a *S3ApiServer) checkDirectoryObject(bucket, object string) (*filer_pb.Entry, bool, error) {
  85. if !strings.HasSuffix(object, "/") {
  86. return nil, false, nil // Not a directory object
  87. }
  88. bucketDir := s3a.option.BucketsPath + "/" + bucket
  89. cleanObject := strings.TrimSuffix(strings.TrimPrefix(object, "/"), "/")
  90. if cleanObject == "" {
  91. return nil, true, nil // Root level directory object, but we don't handle it
  92. }
  93. // Check if directory exists
  94. dirEntry, err := s3a.getEntry(bucketDir, cleanObject)
  95. if err != nil {
  96. if errors.Is(err, filer_pb.ErrNotFound) {
  97. return nil, true, nil // Directory object requested but doesn't exist
  98. }
  99. return nil, true, err // Other errors should be propagated
  100. }
  101. if !dirEntry.IsDirectory {
  102. return nil, true, nil // Exists but not a directory
  103. }
  104. return dirEntry, true, nil
  105. }
  106. // serveDirectoryContent serves the content of a directory object directly
  107. func (s3a *S3ApiServer) serveDirectoryContent(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry) {
  108. // Set content type - use stored MIME type or default
  109. contentType := entry.Attributes.Mime
  110. if contentType == "" {
  111. contentType = "application/octet-stream"
  112. }
  113. w.Header().Set("Content-Type", contentType)
  114. // Set content length - use FileSize for accuracy, especially for large files
  115. contentLength := int64(entry.Attributes.FileSize)
  116. w.Header().Set("Content-Length", strconv.FormatInt(contentLength, 10))
  117. // Set last modified
  118. w.Header().Set("Last-Modified", time.Unix(entry.Attributes.Mtime, 0).UTC().Format(http.TimeFormat))
  119. // Set ETag
  120. w.Header().Set("ETag", "\""+filer.ETag(entry)+"\"")
  121. // For HEAD requests, don't write body
  122. if r.Method == http.MethodHead {
  123. w.WriteHeader(http.StatusOK)
  124. return
  125. }
  126. // Write content
  127. w.WriteHeader(http.StatusOK)
  128. if len(entry.Content) > 0 {
  129. if _, err := w.Write(entry.Content); err != nil {
  130. glog.Errorf("serveDirectoryContent: failed to write response: %v", err)
  131. }
  132. }
  133. }
  134. // handleDirectoryObjectRequest is a helper function that handles directory object requests
  135. // for both GET and HEAD operations, eliminating code duplication
  136. func (s3a *S3ApiServer) handleDirectoryObjectRequest(w http.ResponseWriter, r *http.Request, bucket, object, handlerName string) bool {
  137. // Check if this is a directory object and handle it directly
  138. if dirEntry, isDirectoryObject, err := s3a.checkDirectoryObject(bucket, object); err != nil {
  139. glog.Errorf("%s: error checking directory object %s/%s: %v", handlerName, bucket, object, err)
  140. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  141. return true // Request was handled (with error)
  142. } else if dirEntry != nil {
  143. glog.V(2).Infof("%s: directory object %s/%s found, serving content", handlerName, bucket, object)
  144. s3a.serveDirectoryContent(w, r, dirEntry)
  145. return true // Request was handled successfully
  146. } else if isDirectoryObject {
  147. // Directory object but doesn't exist
  148. glog.V(2).Infof("%s: directory object %s/%s not found", handlerName, bucket, object)
  149. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
  150. return true // Request was handled (with not found)
  151. }
  152. return false // Not a directory object, continue with normal processing
  153. }
  154. func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bucketPrefix string, fetchOwner bool, isDirectory bool, encodingTypeUrl bool, iam AccountManager) (listEntry ListEntry) {
  155. storageClass := "STANDARD"
  156. if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
  157. storageClass = string(v)
  158. }
  159. keyFormat := "%s/%s"
  160. if isDirectory {
  161. keyFormat += "/"
  162. }
  163. if key == "" {
  164. key = fmt.Sprintf(keyFormat, dir, name)[len(bucketPrefix):]
  165. }
  166. if encodingTypeUrl {
  167. key = urlPathEscape(key)
  168. }
  169. listEntry = ListEntry{
  170. Key: key,
  171. LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
  172. ETag: "\"" + filer.ETag(entry) + "\"",
  173. Size: int64(filer.FileSize(entry)),
  174. StorageClass: StorageClass(storageClass),
  175. }
  176. if fetchOwner {
  177. // Extract owner from S3 metadata (Extended attributes) instead of file system attributes
  178. var ownerID, displayName string
  179. if entry.Extended != nil {
  180. if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
  181. ownerID = string(ownerBytes)
  182. }
  183. }
  184. // Fallback to anonymous if no S3 owner found
  185. if ownerID == "" {
  186. ownerID = s3_constants.AccountAnonymousId
  187. displayName = "anonymous"
  188. } else {
  189. // Get the proper display name from IAM system
  190. displayName = iam.GetAccountNameById(ownerID)
  191. // Fallback to ownerID if no display name found
  192. if displayName == "" {
  193. displayName = ownerID
  194. }
  195. }
  196. listEntry.Owner = &CanonicalUser{
  197. ID: ownerID,
  198. DisplayName: displayName,
  199. }
  200. }
  201. return listEntry
  202. }
  203. func (s3a *S3ApiServer) toFilerUrl(bucket, object string) string {
  204. object = urlPathEscape(removeDuplicateSlashes(object))
  205. destUrl := fmt.Sprintf("http://%s%s/%s%s",
  206. s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, object)
  207. return destUrl
  208. }
  209. func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
  210. bucket, object := s3_constants.GetBucketAndObject(r)
  211. glog.V(3).Infof("GetObjectHandler %s %s", bucket, object)
  212. // Handle directory objects with shared logic
  213. if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "GetObjectHandler") {
  214. return // Directory object request was handled
  215. }
  216. // Check conditional headers for read operations
  217. result := s3a.checkConditionalHeadersForReads(r, bucket, object)
  218. if result.ErrorCode != s3err.ErrNone {
  219. glog.V(3).Infof("GetObjectHandler: Conditional header check failed for %s/%s with error %v", bucket, object, result.ErrorCode)
  220. // For 304 Not Modified responses, include the ETag header
  221. if result.ErrorCode == s3err.ErrNotModified && result.ETag != "" {
  222. w.Header().Set("ETag", result.ETag)
  223. }
  224. s3err.WriteErrorResponse(w, r, result.ErrorCode)
  225. return
  226. }
  227. // Check for specific version ID in query parameters
  228. versionId := r.URL.Query().Get("versionId")
  229. // Check if versioning is configured for the bucket (Enabled or Suspended)
  230. versioningConfigured, err := s3a.isVersioningConfigured(bucket)
  231. if err != nil {
  232. if err == filer_pb.ErrNotFound {
  233. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
  234. return
  235. }
  236. glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
  237. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  238. return
  239. }
  240. glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId)
  241. var destUrl string
  242. if versioningConfigured {
  243. // Handle versioned GET - all versions are stored in .versions directory
  244. var targetVersionId string
  245. var entry *filer_pb.Entry
  246. if versionId != "" {
  247. // Request for specific version
  248. glog.V(2).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object)
  249. entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
  250. if err != nil {
  251. glog.Errorf("Failed to get specific version %s: %v", versionId, err)
  252. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
  253. return
  254. }
  255. targetVersionId = versionId
  256. } else {
  257. // Request for latest version
  258. glog.V(1).Infof("GetObject: requesting latest version for %s%s", bucket, object)
  259. entry, err = s3a.getLatestObjectVersion(bucket, object)
  260. if err != nil {
  261. glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err)
  262. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
  263. return
  264. }
  265. if entry.Extended != nil {
  266. if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
  267. targetVersionId = string(versionIdBytes)
  268. }
  269. }
  270. // If no version ID found in entry, this is a pre-versioning object
  271. if targetVersionId == "" {
  272. targetVersionId = "null"
  273. }
  274. }
  275. // Check if this is a delete marker
  276. if entry.Extended != nil {
  277. if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
  278. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
  279. return
  280. }
  281. }
  282. // Determine the actual file path based on whether this is a versioned or pre-versioning object
  283. if targetVersionId == "null" {
  284. // Pre-versioning object - stored as regular file
  285. destUrl = s3a.toFilerUrl(bucket, object)
  286. glog.V(2).Infof("GetObject: pre-versioning object URL: %s", destUrl)
  287. } else {
  288. // Versioned object - stored in .versions directory
  289. versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
  290. destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
  291. glog.V(2).Infof("GetObject: version %s URL: %s", targetVersionId, destUrl)
  292. }
  293. // Set version ID in response header
  294. w.Header().Set("x-amz-version-id", targetVersionId)
  295. // Add object lock metadata to response headers if present
  296. s3a.addObjectLockHeadersToResponse(w, entry)
  297. } else {
  298. // Handle regular GET (non-versioned)
  299. destUrl = s3a.toFilerUrl(bucket, object)
  300. }
  301. // Check if this is a range request to an SSE object and modify the approach
  302. originalRangeHeader := r.Header.Get("Range")
  303. var sseObject = false
  304. // Pre-check if this object is SSE encrypted to avoid filer range conflicts
  305. if originalRangeHeader != "" {
  306. bucket, object := s3_constants.GetBucketAndObject(r)
  307. objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
  308. if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
  309. primarySSEType := s3a.detectPrimarySSEType(objectEntry)
  310. if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS {
  311. sseObject = true
  312. // Temporarily remove Range header to get full encrypted data from filer
  313. r.Header.Del("Range")
  314. }
  315. }
  316. }
  317. s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
  318. // Restore the original Range header for SSE processing
  319. if sseObject && originalRangeHeader != "" {
  320. r.Header.Set("Range", originalRangeHeader)
  321. }
  322. // Add SSE metadata headers based on object metadata before SSE processing
  323. bucket, object := s3_constants.GetBucketAndObject(r)
  324. objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
  325. if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
  326. s3a.addSSEHeadersToResponse(proxyResponse, objectEntry)
  327. }
  328. // Handle SSE decryption (both SSE-C and SSE-KMS) if needed
  329. return s3a.handleSSEResponse(r, proxyResponse, w)
  330. })
  331. }
  332. func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
  333. bucket, object := s3_constants.GetBucketAndObject(r)
  334. glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object)
  335. // Handle directory objects with shared logic
  336. if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "HeadObjectHandler") {
  337. return // Directory object request was handled
  338. }
  339. // Check conditional headers for read operations
  340. result := s3a.checkConditionalHeadersForReads(r, bucket, object)
  341. if result.ErrorCode != s3err.ErrNone {
  342. glog.V(3).Infof("HeadObjectHandler: Conditional header check failed for %s/%s with error %v", bucket, object, result.ErrorCode)
  343. // For 304 Not Modified responses, include the ETag header
  344. if result.ErrorCode == s3err.ErrNotModified && result.ETag != "" {
  345. w.Header().Set("ETag", result.ETag)
  346. }
  347. s3err.WriteErrorResponse(w, r, result.ErrorCode)
  348. return
  349. }
  350. // Check for specific version ID in query parameters
  351. versionId := r.URL.Query().Get("versionId")
  352. // Check if versioning is configured for the bucket (Enabled or Suspended)
  353. versioningConfigured, err := s3a.isVersioningConfigured(bucket)
  354. if err != nil {
  355. if err == filer_pb.ErrNotFound {
  356. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
  357. return
  358. }
  359. glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
  360. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  361. return
  362. }
  363. var destUrl string
  364. if versioningConfigured {
  365. // Handle versioned HEAD - all versions are stored in .versions directory
  366. var targetVersionId string
  367. var entry *filer_pb.Entry
  368. if versionId != "" {
  369. // Request for specific version
  370. glog.V(2).Infof("HeadObject: requesting specific version %s for %s%s", versionId, bucket, object)
  371. entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
  372. if err != nil {
  373. glog.Errorf("Failed to get specific version %s: %v", versionId, err)
  374. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
  375. return
  376. }
  377. targetVersionId = versionId
  378. } else {
  379. // Request for latest version
  380. glog.V(2).Infof("HeadObject: requesting latest version for %s%s", bucket, object)
  381. entry, err = s3a.getLatestObjectVersion(bucket, object)
  382. if err != nil {
  383. glog.Errorf("Failed to get latest version: %v", err)
  384. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
  385. return
  386. }
  387. if entry.Extended != nil {
  388. if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
  389. targetVersionId = string(versionIdBytes)
  390. }
  391. }
  392. // If no version ID found in entry, this is a pre-versioning object
  393. if targetVersionId == "" {
  394. targetVersionId = "null"
  395. }
  396. }
  397. // Check if this is a delete marker
  398. if entry.Extended != nil {
  399. if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
  400. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
  401. return
  402. }
  403. }
  404. // Determine the actual file path based on whether this is a versioned or pre-versioning object
  405. if targetVersionId == "null" {
  406. // Pre-versioning object - stored as regular file
  407. destUrl = s3a.toFilerUrl(bucket, object)
  408. glog.V(2).Infof("HeadObject: pre-versioning object URL: %s", destUrl)
  409. } else {
  410. // Versioned object - stored in .versions directory
  411. versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
  412. destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
  413. glog.V(2).Infof("HeadObject: version %s URL: %s", targetVersionId, destUrl)
  414. }
  415. // Set version ID in response header
  416. w.Header().Set("x-amz-version-id", targetVersionId)
  417. // Add object lock metadata to response headers if present
  418. s3a.addObjectLockHeadersToResponse(w, entry)
  419. } else {
  420. // Handle regular HEAD (non-versioned)
  421. destUrl = s3a.toFilerUrl(bucket, object)
  422. }
  423. s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
  424. // Handle SSE validation (both SSE-C and SSE-KMS) for HEAD requests
  425. return s3a.handleSSEResponse(r, proxyResponse, w)
  426. })
  427. }
  428. func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, isWrite bool, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64)) {
  429. glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl)
  430. start := time.Now()
  431. proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body)
  432. if err != nil {
  433. glog.Errorf("NewRequest %s: %v", destUrl, err)
  434. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  435. return
  436. }
  437. proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
  438. proxyReq.Header.Set("Accept-Encoding", "identity")
  439. for k, v := range r.URL.Query() {
  440. if _, ok := s3_constants.PassThroughHeaders[strings.ToLower(k)]; ok {
  441. proxyReq.Header[k] = v
  442. }
  443. if k == "partNumber" {
  444. proxyReq.Header[s3_constants.SeaweedFSPartNumber] = v
  445. }
  446. }
  447. for header, values := range r.Header {
  448. proxyReq.Header[header] = values
  449. }
  450. if proxyReq.ContentLength == 0 && r.ContentLength != 0 {
  451. proxyReq.ContentLength = r.ContentLength
  452. }
  453. // ensure that the Authorization header is overriding any previous
  454. // Authorization header which might be already present in proxyReq
  455. s3a.maybeAddFilerJwtAuthorization(proxyReq, isWrite)
  456. resp, postErr := s3a.client.Do(proxyReq)
  457. if postErr != nil {
  458. glog.Errorf("post to filer: %v", postErr)
  459. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  460. return
  461. }
  462. defer util_http.CloseResponse(resp)
  463. if resp.StatusCode == http.StatusPreconditionFailed {
  464. s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed)
  465. return
  466. }
  467. if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable {
  468. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
  469. return
  470. }
  471. if r.Method == http.MethodDelete {
  472. if resp.StatusCode == http.StatusNotFound {
  473. // this is normal
  474. responseStatusCode, _ := responseFn(resp, w)
  475. s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
  476. return
  477. }
  478. }
  479. if resp.StatusCode == http.StatusNotFound {
  480. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
  481. return
  482. }
  483. TimeToFirstByte(r.Method, start, r)
  484. if resp.Header.Get(s3_constants.SeaweedFSIsDirectoryKey) == "true" {
  485. responseStatusCode, _ := responseFn(resp, w)
  486. s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
  487. return
  488. }
  489. if resp.StatusCode == http.StatusInternalServerError {
  490. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  491. return
  492. }
  493. // when HEAD a directory, it should be reported as no such key
  494. // https://github.com/seaweedfs/seaweedfs/issues/3457
  495. if resp.ContentLength == -1 && resp.StatusCode != http.StatusNotModified {
  496. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
  497. return
  498. }
  499. if resp.StatusCode == http.StatusBadRequest {
  500. resp_body, _ := io.ReadAll(resp.Body)
  501. switch string(resp_body) {
  502. case "InvalidPart":
  503. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
  504. default:
  505. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
  506. }
  507. resp.Body.Close()
  508. return
  509. }
  510. setUserMetadataKeyToLowercase(resp)
  511. responseStatusCode, bytesTransferred := responseFn(resp, w)
  512. BucketTrafficSent(bytesTransferred, r)
  513. s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
  514. }
  515. func setUserMetadataKeyToLowercase(resp *http.Response) {
  516. for key, value := range resp.Header {
  517. if strings.HasPrefix(key, s3_constants.AmzUserMetaPrefix) {
  518. resp.Header[strings.ToLower(key)] = value
  519. delete(resp.Header, key)
  520. }
  521. }
  522. }
  523. func captureCORSHeaders(w http.ResponseWriter, headersToCapture []string) map[string]string {
  524. captured := make(map[string]string)
  525. for _, corsHeader := range headersToCapture {
  526. if value := w.Header().Get(corsHeader); value != "" {
  527. captured[corsHeader] = value
  528. }
  529. }
  530. return captured
  531. }
  532. func restoreCORSHeaders(w http.ResponseWriter, capturedCORSHeaders map[string]string) {
  533. for corsHeader, value := range capturedCORSHeaders {
  534. w.Header().Set(corsHeader, value)
  535. }
  536. }
  537. // writeFinalResponse handles the common response writing logic shared between
  538. // passThroughResponse and handleSSECResponse
  539. func writeFinalResponse(w http.ResponseWriter, proxyResponse *http.Response, bodyReader io.Reader, capturedCORSHeaders map[string]string) (statusCode int, bytesTransferred int64) {
  540. // Restore CORS headers that were set by middleware
  541. restoreCORSHeaders(w, capturedCORSHeaders)
  542. if proxyResponse.Header.Get("Content-Range") != "" && proxyResponse.StatusCode == 200 {
  543. statusCode = http.StatusPartialContent
  544. } else {
  545. statusCode = proxyResponse.StatusCode
  546. }
  547. w.WriteHeader(statusCode)
  548. // Stream response data
  549. buf := mem.Allocate(128 * 1024)
  550. defer mem.Free(buf)
  551. bytesTransferred, err := io.CopyBuffer(w, bodyReader, buf)
  552. if err != nil {
  553. glog.V(1).Infof("response read %d bytes: %v", bytesTransferred, err)
  554. }
  555. return statusCode, bytesTransferred
  556. }
  557. func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
  558. // Capture existing CORS headers that may have been set by middleware
  559. capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
  560. // Copy headers from proxy response
  561. for k, v := range proxyResponse.Header {
  562. w.Header()[k] = v
  563. }
  564. return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders)
  565. }
  566. // handleSSECResponse handles SSE-C decryption and response processing
  567. func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
  568. // Check if the object has SSE-C metadata
  569. sseAlgorithm := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm)
  570. sseKeyMD5 := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5)
  571. isObjectEncrypted := sseAlgorithm != "" && sseKeyMD5 != ""
  572. // Parse SSE-C headers from request once (avoid duplication)
  573. customerKey, err := ParseSSECHeaders(r)
  574. if err != nil {
  575. errCode := MapSSECErrorToS3Error(err)
  576. s3err.WriteErrorResponse(w, r, errCode)
  577. return http.StatusBadRequest, 0
  578. }
  579. if isObjectEncrypted {
  580. // This object was encrypted with SSE-C, validate customer key
  581. if customerKey == nil {
  582. s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
  583. return http.StatusBadRequest, 0
  584. }
  585. // SSE-C MD5 is base64 and case-sensitive
  586. if customerKey.KeyMD5 != sseKeyMD5 {
  587. // For GET/HEAD requests, AWS S3 returns 403 Forbidden for a key mismatch.
  588. s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
  589. return http.StatusForbidden, 0
  590. }
  591. // SSE-C encrypted objects support HTTP Range requests
  592. // The IV is stored in metadata and CTR mode allows seeking to any offset
  593. // Range requests will be handled by the filer layer with proper offset-based decryption
  594. // Check if this is a chunked or small content SSE-C object
  595. bucket, object := s3_constants.GetBucketAndObject(r)
  596. objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
  597. if entry, err := s3a.getEntry("", objectPath); err == nil {
  598. // Check for SSE-C chunks
  599. sseCChunks := 0
  600. for _, chunk := range entry.GetChunks() {
  601. if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
  602. sseCChunks++
  603. }
  604. }
  605. if sseCChunks >= 1 {
  606. // Handle chunked SSE-C objects - each chunk needs independent decryption
  607. multipartReader, decErr := s3a.createMultipartSSECDecryptedReader(r, proxyResponse)
  608. if decErr != nil {
  609. glog.Errorf("Failed to create multipart SSE-C decrypted reader: %v", decErr)
  610. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  611. return http.StatusInternalServerError, 0
  612. }
  613. // Capture existing CORS headers
  614. capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
  615. // Copy headers from proxy response
  616. for k, v := range proxyResponse.Header {
  617. w.Header()[k] = v
  618. }
  619. // Set proper headers for range requests
  620. rangeHeader := r.Header.Get("Range")
  621. if rangeHeader != "" {
  622. // Parse range header (e.g., "bytes=0-99")
  623. if len(rangeHeader) > 6 && rangeHeader[:6] == "bytes=" {
  624. rangeSpec := rangeHeader[6:]
  625. parts := strings.Split(rangeSpec, "-")
  626. if len(parts) == 2 {
  627. startOffset, endOffset := int64(0), int64(-1)
  628. if parts[0] != "" {
  629. startOffset, _ = strconv.ParseInt(parts[0], 10, 64)
  630. }
  631. if parts[1] != "" {
  632. endOffset, _ = strconv.ParseInt(parts[1], 10, 64)
  633. }
  634. if endOffset >= startOffset {
  635. // Specific range - set proper Content-Length and Content-Range headers
  636. rangeLength := endOffset - startOffset + 1
  637. totalSize := proxyResponse.Header.Get("Content-Length")
  638. w.Header().Set("Content-Length", strconv.FormatInt(rangeLength, 10))
  639. w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%s", startOffset, endOffset, totalSize))
  640. // writeFinalResponse will set status to 206 if Content-Range is present
  641. }
  642. }
  643. }
  644. }
  645. return writeFinalResponse(w, proxyResponse, multipartReader, capturedCORSHeaders)
  646. } else if len(entry.GetChunks()) == 0 && len(entry.Content) > 0 {
  647. // Small content SSE-C object stored directly in entry.Content
  648. // Fall through to traditional single-object SSE-C handling below
  649. }
  650. }
  651. // Single-part SSE-C object: Get IV from proxy response headers (stored during upload)
  652. ivBase64 := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEIVHeader)
  653. if ivBase64 == "" {
  654. glog.Errorf("SSE-C encrypted single-part object missing IV in metadata")
  655. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  656. return http.StatusInternalServerError, 0
  657. }
  658. iv, err := base64.StdEncoding.DecodeString(ivBase64)
  659. if err != nil {
  660. glog.Errorf("Failed to decode IV from metadata: %v", err)
  661. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  662. return http.StatusInternalServerError, 0
  663. }
  664. // Create decrypted reader with IV from metadata
  665. decryptedReader, decErr := CreateSSECDecryptedReader(proxyResponse.Body, customerKey, iv)
  666. if decErr != nil {
  667. glog.Errorf("Failed to create SSE-C decrypted reader: %v", decErr)
  668. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  669. return http.StatusInternalServerError, 0
  670. }
  671. // Capture existing CORS headers that may have been set by middleware
  672. capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
  673. // Copy headers from proxy response (excluding body-related headers that might change)
  674. for k, v := range proxyResponse.Header {
  675. if k != "Content-Length" && k != "Content-Encoding" {
  676. w.Header()[k] = v
  677. }
  678. }
  679. // Set correct Content-Length for SSE-C (only for full object requests)
  680. // With IV stored in metadata, the encrypted length equals the original length
  681. if proxyResponse.Header.Get("Content-Range") == "" {
  682. // Full object request: encrypted length equals original length (IV not in stream)
  683. if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" {
  684. // Content-Length is already correct since IV is stored in metadata, not in data stream
  685. w.Header().Set("Content-Length", contentLengthStr)
  686. }
  687. }
  688. // For range requests, let the actual bytes transferred determine the response length
  689. // Add SSE-C response headers
  690. w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, sseAlgorithm)
  691. w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, sseKeyMD5)
  692. return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders)
  693. } else {
  694. // Object is not encrypted, but check if customer provided SSE-C headers unnecessarily
  695. if customerKey != nil {
  696. s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyNotNeeded)
  697. return http.StatusBadRequest, 0
  698. }
  699. // Normal pass-through response
  700. return passThroughResponse(proxyResponse, w)
  701. }
  702. }
  703. // handleSSEResponse handles both SSE-C and SSE-KMS decryption/validation and response processing
  704. func (s3a *S3ApiServer) handleSSEResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
  705. // Check what the client is expecting based on request headers
  706. clientExpectsSSEC := IsSSECRequest(r)
  707. // Check what the stored object has in headers (may be conflicting after copy)
  708. kmsMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader)
  709. sseAlgorithm := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm)
  710. // Get actual object state by examining chunks (most reliable for cross-encryption)
  711. bucket, object := s3_constants.GetBucketAndObject(r)
  712. objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
  713. actualObjectType := "Unknown"
  714. if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
  715. actualObjectType = s3a.detectPrimarySSEType(objectEntry)
  716. }
  717. // Route based on ACTUAL object type (from chunks) rather than conflicting headers
  718. if actualObjectType == s3_constants.SSETypeC && clientExpectsSSEC {
  719. // Object is SSE-C and client expects SSE-C → SSE-C handler
  720. return s3a.handleSSECResponse(r, proxyResponse, w)
  721. } else if actualObjectType == s3_constants.SSETypeKMS && !clientExpectsSSEC {
  722. // Object is SSE-KMS and client doesn't expect SSE-C → SSE-KMS handler
  723. return s3a.handleSSEKMSResponse(r, proxyResponse, w, kmsMetadataHeader)
  724. } else if actualObjectType == "None" && !clientExpectsSSEC {
  725. // Object is unencrypted and client doesn't expect SSE-C → pass through
  726. return passThroughResponse(proxyResponse, w)
  727. } else if actualObjectType == s3_constants.SSETypeC && !clientExpectsSSEC {
  728. // Object is SSE-C but client doesn't provide SSE-C headers → Error
  729. s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
  730. return http.StatusBadRequest, 0
  731. } else if actualObjectType == s3_constants.SSETypeKMS && clientExpectsSSEC {
  732. // Object is SSE-KMS but client provides SSE-C headers → Error
  733. s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
  734. return http.StatusBadRequest, 0
  735. } else if actualObjectType == "None" && clientExpectsSSEC {
  736. // Object is unencrypted but client provides SSE-C headers → Error
  737. s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
  738. return http.StatusBadRequest, 0
  739. }
  740. // Fallback for edge cases - use original logic with header-based detection
  741. if clientExpectsSSEC && sseAlgorithm != "" {
  742. return s3a.handleSSECResponse(r, proxyResponse, w)
  743. } else if !clientExpectsSSEC && kmsMetadataHeader != "" {
  744. return s3a.handleSSEKMSResponse(r, proxyResponse, w, kmsMetadataHeader)
  745. } else {
  746. return passThroughResponse(proxyResponse, w)
  747. }
  748. }
  749. // handleSSEKMSResponse handles SSE-KMS decryption and response processing
  750. func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, kmsMetadataHeader string) (statusCode int, bytesTransferred int64) {
  751. // Deserialize SSE-KMS metadata
  752. kmsMetadataBytes, err := base64.StdEncoding.DecodeString(kmsMetadataHeader)
  753. if err != nil {
  754. glog.Errorf("Failed to decode SSE-KMS metadata: %v", err)
  755. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  756. return http.StatusInternalServerError, 0
  757. }
  758. sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
  759. if err != nil {
  760. glog.Errorf("Failed to deserialize SSE-KMS metadata: %v", err)
  761. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  762. return http.StatusInternalServerError, 0
  763. }
  764. // For HEAD requests, we don't need to decrypt the body, just add response headers
  765. if r.Method == "HEAD" {
  766. // Capture existing CORS headers that may have been set by middleware
  767. capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
  768. // Copy headers from proxy response
  769. for k, v := range proxyResponse.Header {
  770. w.Header()[k] = v
  771. }
  772. // Add SSE-KMS response headers
  773. AddSSEKMSResponseHeaders(w, sseKMSKey)
  774. return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders)
  775. }
  776. // For GET requests, check if this is a multipart SSE-KMS object
  777. // We need to check the object structure to determine if it's multipart encrypted
  778. isMultipartSSEKMS := false
  779. if sseKMSKey != nil {
  780. // Get the object entry to check chunk structure
  781. bucket, object := s3_constants.GetBucketAndObject(r)
  782. objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
  783. if entry, err := s3a.getEntry("", objectPath); err == nil {
  784. // Check for multipart SSE-KMS
  785. sseKMSChunks := 0
  786. for _, chunk := range entry.GetChunks() {
  787. if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 {
  788. sseKMSChunks++
  789. }
  790. }
  791. isMultipartSSEKMS = sseKMSChunks > 1
  792. glog.Infof("SSE-KMS object detection: chunks=%d, sseKMSChunks=%d, isMultipartSSEKMS=%t",
  793. len(entry.GetChunks()), sseKMSChunks, isMultipartSSEKMS)
  794. }
  795. }
  796. var decryptedReader io.Reader
  797. if isMultipartSSEKMS {
  798. // Handle multipart SSE-KMS objects - each chunk needs independent decryption
  799. multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse)
  800. if decErr != nil {
  801. glog.Errorf("Failed to create multipart SSE-KMS decrypted reader: %v", decErr)
  802. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  803. return http.StatusInternalServerError, 0
  804. }
  805. decryptedReader = multipartReader
  806. glog.V(3).Infof("Using multipart SSE-KMS decryption for object")
  807. } else {
  808. // Handle single-part SSE-KMS objects
  809. singlePartReader, decErr := CreateSSEKMSDecryptedReader(proxyResponse.Body, sseKMSKey)
  810. if decErr != nil {
  811. glog.Errorf("Failed to create SSE-KMS decrypted reader: %v", decErr)
  812. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  813. return http.StatusInternalServerError, 0
  814. }
  815. decryptedReader = singlePartReader
  816. glog.V(3).Infof("Using single-part SSE-KMS decryption for object")
  817. }
  818. // Capture existing CORS headers that may have been set by middleware
  819. capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
  820. // Copy headers from proxy response (excluding body-related headers that might change)
  821. for k, v := range proxyResponse.Header {
  822. if k != "Content-Length" && k != "Content-Encoding" {
  823. w.Header()[k] = v
  824. }
  825. }
  826. // Set correct Content-Length for SSE-KMS
  827. if proxyResponse.Header.Get("Content-Range") == "" {
  828. // For full object requests, encrypted length equals original length
  829. if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" {
  830. w.Header().Set("Content-Length", contentLengthStr)
  831. }
  832. }
  833. // Add SSE-KMS response headers
  834. AddSSEKMSResponseHeaders(w, sseKMSKey)
  835. return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders)
  836. }
  837. // addObjectLockHeadersToResponse extracts object lock metadata from entry Extended attributes
  838. // and adds the appropriate S3 headers to the response
  839. func (s3a *S3ApiServer) addObjectLockHeadersToResponse(w http.ResponseWriter, entry *filer_pb.Entry) {
  840. if entry == nil || entry.Extended == nil {
  841. return
  842. }
  843. // Check if this entry has any object lock metadata (indicating it's from an object lock enabled bucket)
  844. hasObjectLockMode := false
  845. hasRetentionDate := false
  846. // Add object lock mode header if present
  847. if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists && len(modeBytes) > 0 {
  848. w.Header().Set(s3_constants.AmzObjectLockMode, string(modeBytes))
  849. hasObjectLockMode = true
  850. }
  851. // Add retention until date header if present
  852. if dateBytes, exists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; exists && len(dateBytes) > 0 {
  853. dateStr := string(dateBytes)
  854. // Convert Unix timestamp to ISO8601 format for S3 compatibility
  855. if timestamp, err := strconv.ParseInt(dateStr, 10, 64); err == nil {
  856. retainUntilDate := time.Unix(timestamp, 0).UTC()
  857. w.Header().Set(s3_constants.AmzObjectLockRetainUntilDate, retainUntilDate.Format(time.RFC3339))
  858. hasRetentionDate = true
  859. } else {
  860. glog.Errorf("addObjectLockHeadersToResponse: failed to parse retention until date from stored metadata (dateStr: %s): %v", dateStr, err)
  861. }
  862. }
  863. // Add legal hold header - AWS S3 behavior: always include legal hold for object lock enabled buckets
  864. if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists && len(legalHoldBytes) > 0 {
  865. // Return stored S3 standard "ON"/"OFF" values directly
  866. w.Header().Set(s3_constants.AmzObjectLockLegalHold, string(legalHoldBytes))
  867. } else if hasObjectLockMode || hasRetentionDate {
  868. // If this entry has object lock metadata (indicating object lock enabled bucket)
  869. // but no legal hold specifically set, default to "OFF" as per AWS S3 behavior
  870. w.Header().Set(s3_constants.AmzObjectLockLegalHold, s3_constants.LegalHoldOff)
  871. }
  872. }
  873. // addSSEHeadersToResponse converts stored SSE metadata from entry.Extended to HTTP response headers
  874. // Uses intelligent prioritization: only set headers for the PRIMARY encryption type to avoid conflicts
  875. func (s3a *S3ApiServer) addSSEHeadersToResponse(proxyResponse *http.Response, entry *filer_pb.Entry) {
  876. if entry == nil || entry.Extended == nil {
  877. return
  878. }
  879. // Determine the primary encryption type by examining chunks (most reliable)
  880. primarySSEType := s3a.detectPrimarySSEType(entry)
  881. // Only set headers for the PRIMARY encryption type
  882. switch primarySSEType {
  883. case s3_constants.SSETypeC:
  884. // Add only SSE-C headers
  885. if algorithmBytes, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; exists && len(algorithmBytes) > 0 {
  886. proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, string(algorithmBytes))
  887. }
  888. if keyMD5Bytes, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; exists && len(keyMD5Bytes) > 0 {
  889. proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, string(keyMD5Bytes))
  890. }
  891. if ivBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEIV]; exists && len(ivBytes) > 0 {
  892. ivBase64 := base64.StdEncoding.EncodeToString(ivBytes)
  893. proxyResponse.Header.Set(s3_constants.SeaweedFSSSEIVHeader, ivBase64)
  894. }
  895. case s3_constants.SSETypeKMS:
  896. // Add only SSE-KMS headers
  897. if sseAlgorithm, exists := entry.Extended[s3_constants.AmzServerSideEncryption]; exists && len(sseAlgorithm) > 0 {
  898. proxyResponse.Header.Set(s3_constants.AmzServerSideEncryption, string(sseAlgorithm))
  899. }
  900. if kmsKeyID, exists := entry.Extended[s3_constants.AmzServerSideEncryptionAwsKmsKeyId]; exists && len(kmsKeyID) > 0 {
  901. proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, string(kmsKeyID))
  902. }
  903. default:
  904. // Unencrypted or unknown - don't set any SSE headers
  905. }
  906. glog.V(3).Infof("addSSEHeadersToResponse: processed %d extended metadata entries", len(entry.Extended))
  907. }
  908. // detectPrimarySSEType determines the primary SSE type by examining chunk metadata
  909. func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string {
  910. if len(entry.GetChunks()) == 0 {
  911. // No chunks - check object-level metadata only (single objects or smallContent)
  912. hasSSEC := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] != nil
  913. hasSSEKMS := entry.Extended[s3_constants.AmzServerSideEncryption] != nil
  914. if hasSSEC && !hasSSEKMS {
  915. return s3_constants.SSETypeC
  916. } else if hasSSEKMS && !hasSSEC {
  917. return s3_constants.SSETypeKMS
  918. } else if hasSSEC && hasSSEKMS {
  919. // Both present - this should only happen during cross-encryption copies
  920. // Use content to determine actual encryption state
  921. if len(entry.Content) > 0 {
  922. // smallContent - check if it's encrypted (heuristic: random-looking data)
  923. return s3_constants.SSETypeC // Default to SSE-C for mixed case
  924. } else {
  925. // No content, both headers - default to SSE-C
  926. return s3_constants.SSETypeC
  927. }
  928. }
  929. return "None"
  930. }
  931. // Count chunk types to determine primary (multipart objects)
  932. ssecChunks := 0
  933. ssekmsChunks := 0
  934. for _, chunk := range entry.GetChunks() {
  935. switch chunk.GetSseType() {
  936. case filer_pb.SSEType_SSE_C:
  937. ssecChunks++
  938. case filer_pb.SSEType_SSE_KMS:
  939. ssekmsChunks++
  940. }
  941. }
  942. // Primary type is the one with more chunks
  943. if ssecChunks > ssekmsChunks {
  944. return s3_constants.SSETypeC
  945. } else if ssekmsChunks > ssecChunks {
  946. return s3_constants.SSETypeKMS
  947. } else if ssecChunks > 0 {
  948. // Equal number, prefer SSE-C (shouldn't happen in practice)
  949. return s3_constants.SSETypeC
  950. }
  951. return "None"
  952. }
  953. // createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects
  954. func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) {
  955. // Get the object path from the request
  956. bucket, object := s3_constants.GetBucketAndObject(r)
  957. objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
  958. // Get the object entry from filer to access chunk information
  959. entry, err := s3a.getEntry("", objectPath)
  960. if err != nil {
  961. return nil, fmt.Errorf("failed to get object entry for multipart SSE-KMS decryption: %v", err)
  962. }
  963. // Sort chunks by offset to ensure correct order
  964. chunks := entry.GetChunks()
  965. sort.Slice(chunks, func(i, j int) bool {
  966. return chunks[i].GetOffset() < chunks[j].GetOffset()
  967. })
  968. // Create readers for each chunk, decrypting them independently
  969. var readers []io.Reader
  970. for i, chunk := range chunks {
  971. glog.Infof("Processing chunk %d/%d: fileId=%s, offset=%d, size=%d, sse_type=%d",
  972. i+1, len(entry.GetChunks()), chunk.GetFileIdString(), chunk.GetOffset(), chunk.GetSize(), chunk.GetSseType())
  973. // Get this chunk's encrypted data
  974. chunkReader, err := s3a.createEncryptedChunkReader(chunk)
  975. if err != nil {
  976. return nil, fmt.Errorf("failed to create chunk reader: %v", err)
  977. }
  978. // Get SSE-KMS metadata for this chunk
  979. var chunkSSEKMSKey *SSEKMSKey
  980. // Check if this chunk has per-chunk SSE-KMS metadata (new architecture)
  981. if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 {
  982. // Use the per-chunk SSE-KMS metadata
  983. kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata())
  984. if err != nil {
  985. glog.Errorf("Failed to deserialize per-chunk SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err)
  986. } else {
  987. // ChunkOffset is already set from the stored metadata (PartOffset)
  988. chunkSSEKMSKey = kmsKey
  989. glog.Infof("Using per-chunk SSE-KMS metadata for chunk %s: keyID=%s, IV=%x, partOffset=%d",
  990. chunk.GetFileIdString(), kmsKey.KeyID, kmsKey.IV[:8], kmsKey.ChunkOffset)
  991. }
  992. }
  993. // Fallback to object-level metadata (legacy support)
  994. if chunkSSEKMSKey == nil {
  995. objectMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader)
  996. if objectMetadataHeader != "" {
  997. kmsMetadataBytes, decodeErr := base64.StdEncoding.DecodeString(objectMetadataHeader)
  998. if decodeErr == nil {
  999. kmsKey, _ := DeserializeSSEKMSMetadata(kmsMetadataBytes)
  1000. if kmsKey != nil {
  1001. // For object-level metadata (legacy), use absolute file offset as fallback
  1002. kmsKey.ChunkOffset = chunk.GetOffset()
  1003. chunkSSEKMSKey = kmsKey
  1004. }
  1005. glog.Infof("Using fallback object-level SSE-KMS metadata for chunk %s with offset %d", chunk.GetFileIdString(), chunk.GetOffset())
  1006. }
  1007. }
  1008. }
  1009. if chunkSSEKMSKey == nil {
  1010. return nil, fmt.Errorf("no SSE-KMS metadata found for chunk %s in multipart object", chunk.GetFileIdString())
  1011. }
  1012. // Create decrypted reader for this chunk
  1013. decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, chunkSSEKMSKey)
  1014. if decErr != nil {
  1015. chunkReader.Close() // Close the chunk reader if decryption fails
  1016. return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr)
  1017. }
  1018. // Use the streaming decrypted reader directly instead of reading into memory
  1019. readers = append(readers, decryptedChunkReader)
  1020. glog.V(4).Infof("Added streaming decrypted reader for chunk %s in multipart SSE-KMS object", chunk.GetFileIdString())
  1021. }
  1022. // Combine all decrypted chunk readers into a single stream with proper resource management
  1023. multiReader := NewMultipartSSEReader(readers)
  1024. glog.V(3).Infof("Created multipart SSE-KMS decrypted reader with %d chunks", len(readers))
  1025. return multiReader, nil
  1026. }
  1027. // createEncryptedChunkReader creates a reader for a single encrypted chunk
  1028. func (s3a *S3ApiServer) createEncryptedChunkReader(chunk *filer_pb.FileChunk) (io.ReadCloser, error) {
  1029. // Get chunk URL
  1030. srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString())
  1031. if err != nil {
  1032. return nil, fmt.Errorf("lookup volume URL for chunk %s: %v", chunk.GetFileIdString(), err)
  1033. }
  1034. // Create HTTP request for chunk data
  1035. req, err := http.NewRequest("GET", srcUrl, nil)
  1036. if err != nil {
  1037. return nil, fmt.Errorf("create HTTP request for chunk: %v", err)
  1038. }
  1039. // Execute request
  1040. resp, err := http.DefaultClient.Do(req)
  1041. if err != nil {
  1042. return nil, fmt.Errorf("execute HTTP request for chunk: %v", err)
  1043. }
  1044. if resp.StatusCode != http.StatusOK {
  1045. resp.Body.Close()
  1046. return nil, fmt.Errorf("HTTP request for chunk failed: %d", resp.StatusCode)
  1047. }
  1048. return resp.Body, nil
  1049. }
  1050. // MultipartSSEReader wraps multiple readers and ensures all underlying readers are properly closed
  1051. type MultipartSSEReader struct {
  1052. multiReader io.Reader
  1053. readers []io.Reader
  1054. }
  1055. // SSERangeReader applies range logic to an underlying reader
  1056. type SSERangeReader struct {
  1057. reader io.Reader
  1058. offset int64 // bytes to skip from the beginning
  1059. remaining int64 // bytes remaining to read (-1 for unlimited)
  1060. skipped int64 // bytes already skipped
  1061. }
  1062. // NewMultipartSSEReader creates a new multipart reader that can properly close all underlying readers
  1063. func NewMultipartSSEReader(readers []io.Reader) *MultipartSSEReader {
  1064. return &MultipartSSEReader{
  1065. multiReader: io.MultiReader(readers...),
  1066. readers: readers,
  1067. }
  1068. }
  1069. // Read implements the io.Reader interface
  1070. func (m *MultipartSSEReader) Read(p []byte) (n int, err error) {
  1071. return m.multiReader.Read(p)
  1072. }
  1073. // Close implements the io.Closer interface and closes all underlying readers that support closing
  1074. func (m *MultipartSSEReader) Close() error {
  1075. var lastErr error
  1076. for i, reader := range m.readers {
  1077. if closer, ok := reader.(io.Closer); ok {
  1078. if err := closer.Close(); err != nil {
  1079. glog.V(2).Infof("Error closing reader %d: %v", i, err)
  1080. lastErr = err // Keep track of the last error, but continue closing others
  1081. }
  1082. }
  1083. }
  1084. return lastErr
  1085. }
  1086. // Read implements the io.Reader interface for SSERangeReader
  1087. func (r *SSERangeReader) Read(p []byte) (n int, err error) {
  1088. // If we need to skip bytes and haven't skipped enough yet
  1089. if r.skipped < r.offset {
  1090. skipNeeded := r.offset - r.skipped
  1091. skipBuf := make([]byte, min(int64(len(p)), skipNeeded))
  1092. skipRead, skipErr := r.reader.Read(skipBuf)
  1093. r.skipped += int64(skipRead)
  1094. if skipErr != nil {
  1095. return 0, skipErr
  1096. }
  1097. // If we still need to skip more, recurse
  1098. if r.skipped < r.offset {
  1099. return r.Read(p)
  1100. }
  1101. }
  1102. // If we have a remaining limit and it's reached
  1103. if r.remaining == 0 {
  1104. return 0, io.EOF
  1105. }
  1106. // Calculate how much to read
  1107. readSize := len(p)
  1108. if r.remaining > 0 && int64(readSize) > r.remaining {
  1109. readSize = int(r.remaining)
  1110. }
  1111. // Read the data
  1112. n, err = r.reader.Read(p[:readSize])
  1113. if r.remaining > 0 {
  1114. r.remaining -= int64(n)
  1115. }
  1116. return n, err
  1117. }
  1118. // createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects
  1119. // Each chunk has its own IV and encryption key from the original multipart parts
  1120. func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) {
  1121. // Parse SSE-C headers from the request for decryption key
  1122. customerKey, err := ParseSSECHeaders(r)
  1123. if err != nil {
  1124. return nil, fmt.Errorf("invalid SSE-C headers for multipart decryption: %v", err)
  1125. }
  1126. // Get the object path from the request
  1127. bucket, object := s3_constants.GetBucketAndObject(r)
  1128. objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
  1129. // Get the object entry from filer to access chunk information
  1130. entry, err := s3a.getEntry("", objectPath)
  1131. if err != nil {
  1132. return nil, fmt.Errorf("failed to get object entry for multipart SSE-C decryption: %v", err)
  1133. }
  1134. // Sort chunks by offset to ensure correct order
  1135. chunks := entry.GetChunks()
  1136. sort.Slice(chunks, func(i, j int) bool {
  1137. return chunks[i].GetOffset() < chunks[j].GetOffset()
  1138. })
  1139. // Check for Range header to optimize chunk processing
  1140. var startOffset, endOffset int64 = 0, -1
  1141. rangeHeader := r.Header.Get("Range")
  1142. if rangeHeader != "" {
  1143. // Parse range header (e.g., "bytes=0-99")
  1144. if len(rangeHeader) > 6 && rangeHeader[:6] == "bytes=" {
  1145. rangeSpec := rangeHeader[6:]
  1146. parts := strings.Split(rangeSpec, "-")
  1147. if len(parts) == 2 {
  1148. if parts[0] != "" {
  1149. startOffset, _ = strconv.ParseInt(parts[0], 10, 64)
  1150. }
  1151. if parts[1] != "" {
  1152. endOffset, _ = strconv.ParseInt(parts[1], 10, 64)
  1153. }
  1154. }
  1155. }
  1156. }
  1157. // Filter chunks to only those needed for the range request
  1158. var neededChunks []*filer_pb.FileChunk
  1159. for _, chunk := range chunks {
  1160. chunkStart := chunk.GetOffset()
  1161. chunkEnd := chunkStart + int64(chunk.GetSize()) - 1
  1162. // Check if this chunk overlaps with the requested range
  1163. if endOffset == -1 {
  1164. // No end specified, take all chunks from startOffset
  1165. if chunkEnd >= startOffset {
  1166. neededChunks = append(neededChunks, chunk)
  1167. }
  1168. } else {
  1169. // Specific range: check for overlap
  1170. if chunkStart <= endOffset && chunkEnd >= startOffset {
  1171. neededChunks = append(neededChunks, chunk)
  1172. }
  1173. }
  1174. }
  1175. // Create readers for only the needed chunks
  1176. var readers []io.Reader
  1177. for _, chunk := range neededChunks {
  1178. // Get this chunk's encrypted data
  1179. chunkReader, err := s3a.createEncryptedChunkReader(chunk)
  1180. if err != nil {
  1181. return nil, fmt.Errorf("failed to create chunk reader: %v", err)
  1182. }
  1183. if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
  1184. // For SSE-C chunks, extract the IV from the stored per-chunk metadata (unified approach)
  1185. if len(chunk.GetSseMetadata()) > 0 {
  1186. // Deserialize the SSE-C metadata stored in the unified metadata field
  1187. ssecMetadata, decErr := DeserializeSSECMetadata(chunk.GetSseMetadata())
  1188. if decErr != nil {
  1189. return nil, fmt.Errorf("failed to deserialize SSE-C metadata for chunk %s: %v", chunk.GetFileIdString(), decErr)
  1190. }
  1191. // Decode the IV from the metadata
  1192. iv, ivErr := base64.StdEncoding.DecodeString(ssecMetadata.IV)
  1193. if ivErr != nil {
  1194. return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), ivErr)
  1195. }
  1196. // Calculate the correct IV for this chunk using within-part offset
  1197. var chunkIV []byte
  1198. if ssecMetadata.PartOffset > 0 {
  1199. chunkIV = calculateIVWithOffset(iv, ssecMetadata.PartOffset)
  1200. } else {
  1201. chunkIV = iv
  1202. }
  1203. decryptedReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV)
  1204. if decErr != nil {
  1205. return nil, fmt.Errorf("failed to create SSE-C decrypted reader for chunk %s: %v", chunk.GetFileIdString(), decErr)
  1206. }
  1207. readers = append(readers, decryptedReader)
  1208. glog.Infof("Created SSE-C decrypted reader for chunk %s using stored metadata", chunk.GetFileIdString())
  1209. } else {
  1210. return nil, fmt.Errorf("SSE-C chunk %s missing required metadata", chunk.GetFileIdString())
  1211. }
  1212. } else {
  1213. // Non-SSE-C chunk, use as-is
  1214. readers = append(readers, chunkReader)
  1215. }
  1216. }
  1217. multiReader := NewMultipartSSEReader(readers)
  1218. // Apply range logic if a range was requested
  1219. if rangeHeader != "" && startOffset >= 0 {
  1220. if endOffset == -1 {
  1221. // Open-ended range (e.g., "bytes=100-")
  1222. return &SSERangeReader{
  1223. reader: multiReader,
  1224. offset: startOffset,
  1225. remaining: -1, // Read until EOF
  1226. }, nil
  1227. } else {
  1228. // Specific range (e.g., "bytes=0-99")
  1229. rangeLength := endOffset - startOffset + 1
  1230. return &SSERangeReader{
  1231. reader: multiReader,
  1232. offset: startOffset,
  1233. remaining: rangeLength,
  1234. }, nil
  1235. }
  1236. }
  1237. return multiReader, nil
  1238. }