| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446 |
- package s3api
- import (
- "bytes"
- "encoding/base64"
- "errors"
- "fmt"
- "io"
- "net/http"
- "net/url"
- "sort"
- "strconv"
- "strings"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
- "github.com/seaweedfs/seaweedfs/weed/util/mem"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
- )
- // corsHeaders defines the CORS headers that need to be preserved
- // Package-level constant to avoid repeated allocations
- var corsHeaders = []string{
- "Access-Control-Allow-Origin",
- "Access-Control-Allow-Methods",
- "Access-Control-Allow-Headers",
- "Access-Control-Expose-Headers",
- "Access-Control-Max-Age",
- "Access-Control-Allow-Credentials",
- }
- func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser {
- mimeBuffer := make([]byte, 512)
- size, _ := dataReader.Read(mimeBuffer)
- if size > 0 {
- r.Header.Set("Content-Type", http.DetectContentType(mimeBuffer[:size]))
- return io.NopCloser(io.MultiReader(bytes.NewReader(mimeBuffer[:size]), dataReader))
- }
- return io.NopCloser(dataReader)
- }
- func urlEscapeObject(object string) string {
- t := urlPathEscape(removeDuplicateSlashes(object))
- if strings.HasPrefix(t, "/") {
- return t
- }
- return "/" + t
- }
- func entryUrlEncode(dir string, entry string, encodingTypeUrl bool) (dirName string, entryName string, prefix string) {
- if !encodingTypeUrl {
- return dir, entry, entry
- }
- return urlPathEscape(dir), url.QueryEscape(entry), urlPathEscape(entry)
- }
- func urlPathEscape(object string) string {
- var escapedParts []string
- for _, part := range strings.Split(object, "/") {
- escapedParts = append(escapedParts, strings.ReplaceAll(url.PathEscape(part), "+", "%2B"))
- }
- return strings.Join(escapedParts, "/")
- }
- func removeDuplicateSlashes(object string) string {
- result := strings.Builder{}
- result.Grow(len(object))
- isLastSlash := false
- for _, r := range object {
- switch r {
- case '/':
- if !isLastSlash {
- result.WriteRune(r)
- }
- isLastSlash = true
- default:
- result.WriteRune(r)
- isLastSlash = false
- }
- }
- return result.String()
- }
- // checkDirectoryObject checks if the object is a directory object (ends with "/") and if it exists
- // Returns: (entry, isDirectoryObject, error)
- // - entry: the directory entry if found and is a directory
- // - isDirectoryObject: true if the request was for a directory object (ends with "/")
- // - error: any error encountered while checking
- func (s3a *S3ApiServer) checkDirectoryObject(bucket, object string) (*filer_pb.Entry, bool, error) {
- if !strings.HasSuffix(object, "/") {
- return nil, false, nil // Not a directory object
- }
- bucketDir := s3a.option.BucketsPath + "/" + bucket
- cleanObject := strings.TrimSuffix(strings.TrimPrefix(object, "/"), "/")
- if cleanObject == "" {
- return nil, true, nil // Root level directory object, but we don't handle it
- }
- // Check if directory exists
- dirEntry, err := s3a.getEntry(bucketDir, cleanObject)
- if err != nil {
- if errors.Is(err, filer_pb.ErrNotFound) {
- return nil, true, nil // Directory object requested but doesn't exist
- }
- return nil, true, err // Other errors should be propagated
- }
- if !dirEntry.IsDirectory {
- return nil, true, nil // Exists but not a directory
- }
- return dirEntry, true, nil
- }
- // serveDirectoryContent serves the content of a directory object directly
- func (s3a *S3ApiServer) serveDirectoryContent(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry) {
- // Set content type - use stored MIME type or default
- contentType := entry.Attributes.Mime
- if contentType == "" {
- contentType = "application/octet-stream"
- }
- w.Header().Set("Content-Type", contentType)
- // Set content length - use FileSize for accuracy, especially for large files
- contentLength := int64(entry.Attributes.FileSize)
- w.Header().Set("Content-Length", strconv.FormatInt(contentLength, 10))
- // Set last modified
- w.Header().Set("Last-Modified", time.Unix(entry.Attributes.Mtime, 0).UTC().Format(http.TimeFormat))
- // Set ETag
- w.Header().Set("ETag", "\""+filer.ETag(entry)+"\"")
- // For HEAD requests, don't write body
- if r.Method == http.MethodHead {
- w.WriteHeader(http.StatusOK)
- return
- }
- // Write content
- w.WriteHeader(http.StatusOK)
- if len(entry.Content) > 0 {
- if _, err := w.Write(entry.Content); err != nil {
- glog.Errorf("serveDirectoryContent: failed to write response: %v", err)
- }
- }
- }
- // handleDirectoryObjectRequest is a helper function that handles directory object requests
- // for both GET and HEAD operations, eliminating code duplication
- func (s3a *S3ApiServer) handleDirectoryObjectRequest(w http.ResponseWriter, r *http.Request, bucket, object, handlerName string) bool {
- // Check if this is a directory object and handle it directly
- if dirEntry, isDirectoryObject, err := s3a.checkDirectoryObject(bucket, object); err != nil {
- glog.Errorf("%s: error checking directory object %s/%s: %v", handlerName, bucket, object, err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return true // Request was handled (with error)
- } else if dirEntry != nil {
- glog.V(2).Infof("%s: directory object %s/%s found, serving content", handlerName, bucket, object)
- s3a.serveDirectoryContent(w, r, dirEntry)
- return true // Request was handled successfully
- } else if isDirectoryObject {
- // Directory object but doesn't exist
- glog.V(2).Infof("%s: directory object %s/%s not found", handlerName, bucket, object)
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
- return true // Request was handled (with not found)
- }
- return false // Not a directory object, continue with normal processing
- }
- func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bucketPrefix string, fetchOwner bool, isDirectory bool, encodingTypeUrl bool, iam AccountManager) (listEntry ListEntry) {
- storageClass := "STANDARD"
- if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
- storageClass = string(v)
- }
- keyFormat := "%s/%s"
- if isDirectory {
- keyFormat += "/"
- }
- if key == "" {
- key = fmt.Sprintf(keyFormat, dir, name)[len(bucketPrefix):]
- }
- if encodingTypeUrl {
- key = urlPathEscape(key)
- }
- listEntry = ListEntry{
- Key: key,
- LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
- ETag: "\"" + filer.ETag(entry) + "\"",
- Size: int64(filer.FileSize(entry)),
- StorageClass: StorageClass(storageClass),
- }
- if fetchOwner {
- // Extract owner from S3 metadata (Extended attributes) instead of file system attributes
- var ownerID, displayName string
- if entry.Extended != nil {
- if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
- ownerID = string(ownerBytes)
- }
- }
- // Fallback to anonymous if no S3 owner found
- if ownerID == "" {
- ownerID = s3_constants.AccountAnonymousId
- displayName = "anonymous"
- } else {
- // Get the proper display name from IAM system
- displayName = iam.GetAccountNameById(ownerID)
- // Fallback to ownerID if no display name found
- if displayName == "" {
- displayName = ownerID
- }
- }
- listEntry.Owner = &CanonicalUser{
- ID: ownerID,
- DisplayName: displayName,
- }
- }
- return listEntry
- }
- func (s3a *S3ApiServer) toFilerUrl(bucket, object string) string {
- object = urlPathEscape(removeDuplicateSlashes(object))
- destUrl := fmt.Sprintf("http://%s%s/%s%s",
- s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, object)
- return destUrl
- }
- func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
- bucket, object := s3_constants.GetBucketAndObject(r)
- glog.V(3).Infof("GetObjectHandler %s %s", bucket, object)
- // Handle directory objects with shared logic
- if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "GetObjectHandler") {
- return // Directory object request was handled
- }
- // Check conditional headers for read operations
- result := s3a.checkConditionalHeadersForReads(r, bucket, object)
- if result.ErrorCode != s3err.ErrNone {
- glog.V(3).Infof("GetObjectHandler: Conditional header check failed for %s/%s with error %v", bucket, object, result.ErrorCode)
- // For 304 Not Modified responses, include the ETag header
- if result.ErrorCode == s3err.ErrNotModified && result.ETag != "" {
- w.Header().Set("ETag", result.ETag)
- }
- s3err.WriteErrorResponse(w, r, result.ErrorCode)
- return
- }
- // Check for specific version ID in query parameters
- versionId := r.URL.Query().Get("versionId")
- // Check if versioning is configured for the bucket (Enabled or Suspended)
- versioningConfigured, err := s3a.isVersioningConfigured(bucket)
- if err != nil {
- if err == filer_pb.ErrNotFound {
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
- return
- }
- glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return
- }
- glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId)
- var destUrl string
- if versioningConfigured {
- // Handle versioned GET - all versions are stored in .versions directory
- var targetVersionId string
- var entry *filer_pb.Entry
- if versionId != "" {
- // Request for specific version
- glog.V(2).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object)
- entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
- if err != nil {
- glog.Errorf("Failed to get specific version %s: %v", versionId, err)
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
- return
- }
- targetVersionId = versionId
- } else {
- // Request for latest version
- glog.V(1).Infof("GetObject: requesting latest version for %s%s", bucket, object)
- entry, err = s3a.getLatestObjectVersion(bucket, object)
- if err != nil {
- glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err)
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
- return
- }
- if entry.Extended != nil {
- if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
- targetVersionId = string(versionIdBytes)
- }
- }
- // If no version ID found in entry, this is a pre-versioning object
- if targetVersionId == "" {
- targetVersionId = "null"
- }
- }
- // Check if this is a delete marker
- if entry.Extended != nil {
- if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
- return
- }
- }
- // Determine the actual file path based on whether this is a versioned or pre-versioning object
- if targetVersionId == "null" {
- // Pre-versioning object - stored as regular file
- destUrl = s3a.toFilerUrl(bucket, object)
- glog.V(2).Infof("GetObject: pre-versioning object URL: %s", destUrl)
- } else {
- // Versioned object - stored in .versions directory
- versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
- destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
- glog.V(2).Infof("GetObject: version %s URL: %s", targetVersionId, destUrl)
- }
- // Set version ID in response header
- w.Header().Set("x-amz-version-id", targetVersionId)
- // Add object lock metadata to response headers if present
- s3a.addObjectLockHeadersToResponse(w, entry)
- } else {
- // Handle regular GET (non-versioned)
- destUrl = s3a.toFilerUrl(bucket, object)
- }
- // Check if this is a range request to an SSE object and modify the approach
- originalRangeHeader := r.Header.Get("Range")
- var sseObject = false
- // Pre-check if this object is SSE encrypted to avoid filer range conflicts
- if originalRangeHeader != "" {
- bucket, object := s3_constants.GetBucketAndObject(r)
- objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
- if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
- primarySSEType := s3a.detectPrimarySSEType(objectEntry)
- if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS {
- sseObject = true
- // Temporarily remove Range header to get full encrypted data from filer
- r.Header.Del("Range")
- }
- }
- }
- s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
- // Restore the original Range header for SSE processing
- if sseObject && originalRangeHeader != "" {
- r.Header.Set("Range", originalRangeHeader)
- }
- // Add SSE metadata headers based on object metadata before SSE processing
- bucket, object := s3_constants.GetBucketAndObject(r)
- objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
- if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
- s3a.addSSEHeadersToResponse(proxyResponse, objectEntry)
- }
- // Handle SSE decryption (both SSE-C and SSE-KMS) if needed
- return s3a.handleSSEResponse(r, proxyResponse, w)
- })
- }
- func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
- bucket, object := s3_constants.GetBucketAndObject(r)
- glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object)
- // Handle directory objects with shared logic
- if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "HeadObjectHandler") {
- return // Directory object request was handled
- }
- // Check conditional headers for read operations
- result := s3a.checkConditionalHeadersForReads(r, bucket, object)
- if result.ErrorCode != s3err.ErrNone {
- glog.V(3).Infof("HeadObjectHandler: Conditional header check failed for %s/%s with error %v", bucket, object, result.ErrorCode)
- // For 304 Not Modified responses, include the ETag header
- if result.ErrorCode == s3err.ErrNotModified && result.ETag != "" {
- w.Header().Set("ETag", result.ETag)
- }
- s3err.WriteErrorResponse(w, r, result.ErrorCode)
- return
- }
- // Check for specific version ID in query parameters
- versionId := r.URL.Query().Get("versionId")
- // Check if versioning is configured for the bucket (Enabled or Suspended)
- versioningConfigured, err := s3a.isVersioningConfigured(bucket)
- if err != nil {
- if err == filer_pb.ErrNotFound {
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
- return
- }
- glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return
- }
- var destUrl string
- if versioningConfigured {
- // Handle versioned HEAD - all versions are stored in .versions directory
- var targetVersionId string
- var entry *filer_pb.Entry
- if versionId != "" {
- // Request for specific version
- glog.V(2).Infof("HeadObject: requesting specific version %s for %s%s", versionId, bucket, object)
- entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
- if err != nil {
- glog.Errorf("Failed to get specific version %s: %v", versionId, err)
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
- return
- }
- targetVersionId = versionId
- } else {
- // Request for latest version
- glog.V(2).Infof("HeadObject: requesting latest version for %s%s", bucket, object)
- entry, err = s3a.getLatestObjectVersion(bucket, object)
- if err != nil {
- glog.Errorf("Failed to get latest version: %v", err)
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
- return
- }
- if entry.Extended != nil {
- if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
- targetVersionId = string(versionIdBytes)
- }
- }
- // If no version ID found in entry, this is a pre-versioning object
- if targetVersionId == "" {
- targetVersionId = "null"
- }
- }
- // Check if this is a delete marker
- if entry.Extended != nil {
- if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" {
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
- return
- }
- }
- // Determine the actual file path based on whether this is a versioned or pre-versioning object
- if targetVersionId == "null" {
- // Pre-versioning object - stored as regular file
- destUrl = s3a.toFilerUrl(bucket, object)
- glog.V(2).Infof("HeadObject: pre-versioning object URL: %s", destUrl)
- } else {
- // Versioned object - stored in .versions directory
- versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
- destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
- glog.V(2).Infof("HeadObject: version %s URL: %s", targetVersionId, destUrl)
- }
- // Set version ID in response header
- w.Header().Set("x-amz-version-id", targetVersionId)
- // Add object lock metadata to response headers if present
- s3a.addObjectLockHeadersToResponse(w, entry)
- } else {
- // Handle regular HEAD (non-versioned)
- destUrl = s3a.toFilerUrl(bucket, object)
- }
- s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
- // Handle SSE validation (both SSE-C and SSE-KMS) for HEAD requests
- return s3a.handleSSEResponse(r, proxyResponse, w)
- })
- }
- func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, isWrite bool, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64)) {
- glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl)
- start := time.Now()
- proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body)
- if err != nil {
- glog.Errorf("NewRequest %s: %v", destUrl, err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return
- }
- proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
- proxyReq.Header.Set("Accept-Encoding", "identity")
- for k, v := range r.URL.Query() {
- if _, ok := s3_constants.PassThroughHeaders[strings.ToLower(k)]; ok {
- proxyReq.Header[k] = v
- }
- if k == "partNumber" {
- proxyReq.Header[s3_constants.SeaweedFSPartNumber] = v
- }
- }
- for header, values := range r.Header {
- proxyReq.Header[header] = values
- }
- if proxyReq.ContentLength == 0 && r.ContentLength != 0 {
- proxyReq.ContentLength = r.ContentLength
- }
- // ensure that the Authorization header is overriding any previous
- // Authorization header which might be already present in proxyReq
- s3a.maybeAddFilerJwtAuthorization(proxyReq, isWrite)
- resp, postErr := s3a.client.Do(proxyReq)
- if postErr != nil {
- glog.Errorf("post to filer: %v", postErr)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return
- }
- defer util_http.CloseResponse(resp)
- if resp.StatusCode == http.StatusPreconditionFailed {
- s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed)
- return
- }
- if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
- return
- }
- if r.Method == http.MethodDelete {
- if resp.StatusCode == http.StatusNotFound {
- // this is normal
- responseStatusCode, _ := responseFn(resp, w)
- s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
- return
- }
- }
- if resp.StatusCode == http.StatusNotFound {
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
- return
- }
- TimeToFirstByte(r.Method, start, r)
- if resp.Header.Get(s3_constants.SeaweedFSIsDirectoryKey) == "true" {
- responseStatusCode, _ := responseFn(resp, w)
- s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
- return
- }
- if resp.StatusCode == http.StatusInternalServerError {
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return
- }
- // when HEAD a directory, it should be reported as no such key
- // https://github.com/seaweedfs/seaweedfs/issues/3457
- if resp.ContentLength == -1 && resp.StatusCode != http.StatusNotModified {
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
- return
- }
- if resp.StatusCode == http.StatusBadRequest {
- resp_body, _ := io.ReadAll(resp.Body)
- switch string(resp_body) {
- case "InvalidPart":
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
- default:
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
- }
- resp.Body.Close()
- return
- }
- setUserMetadataKeyToLowercase(resp)
- responseStatusCode, bytesTransferred := responseFn(resp, w)
- BucketTrafficSent(bytesTransferred, r)
- s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
- }
- func setUserMetadataKeyToLowercase(resp *http.Response) {
- for key, value := range resp.Header {
- if strings.HasPrefix(key, s3_constants.AmzUserMetaPrefix) {
- resp.Header[strings.ToLower(key)] = value
- delete(resp.Header, key)
- }
- }
- }
- func captureCORSHeaders(w http.ResponseWriter, headersToCapture []string) map[string]string {
- captured := make(map[string]string)
- for _, corsHeader := range headersToCapture {
- if value := w.Header().Get(corsHeader); value != "" {
- captured[corsHeader] = value
- }
- }
- return captured
- }
- func restoreCORSHeaders(w http.ResponseWriter, capturedCORSHeaders map[string]string) {
- for corsHeader, value := range capturedCORSHeaders {
- w.Header().Set(corsHeader, value)
- }
- }
- // writeFinalResponse handles the common response writing logic shared between
- // passThroughResponse and handleSSECResponse
- func writeFinalResponse(w http.ResponseWriter, proxyResponse *http.Response, bodyReader io.Reader, capturedCORSHeaders map[string]string) (statusCode int, bytesTransferred int64) {
- // Restore CORS headers that were set by middleware
- restoreCORSHeaders(w, capturedCORSHeaders)
- if proxyResponse.Header.Get("Content-Range") != "" && proxyResponse.StatusCode == 200 {
- statusCode = http.StatusPartialContent
- } else {
- statusCode = proxyResponse.StatusCode
- }
- w.WriteHeader(statusCode)
- // Stream response data
- buf := mem.Allocate(128 * 1024)
- defer mem.Free(buf)
- bytesTransferred, err := io.CopyBuffer(w, bodyReader, buf)
- if err != nil {
- glog.V(1).Infof("response read %d bytes: %v", bytesTransferred, err)
- }
- return statusCode, bytesTransferred
- }
- func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
- // Capture existing CORS headers that may have been set by middleware
- capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
- // Copy headers from proxy response
- for k, v := range proxyResponse.Header {
- w.Header()[k] = v
- }
- return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders)
- }
- // handleSSECResponse handles SSE-C decryption and response processing
- func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
- // Check if the object has SSE-C metadata
- sseAlgorithm := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm)
- sseKeyMD5 := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5)
- isObjectEncrypted := sseAlgorithm != "" && sseKeyMD5 != ""
- // Parse SSE-C headers from request once (avoid duplication)
- customerKey, err := ParseSSECHeaders(r)
- if err != nil {
- errCode := MapSSECErrorToS3Error(err)
- s3err.WriteErrorResponse(w, r, errCode)
- return http.StatusBadRequest, 0
- }
- if isObjectEncrypted {
- // This object was encrypted with SSE-C, validate customer key
- if customerKey == nil {
- s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
- return http.StatusBadRequest, 0
- }
- // SSE-C MD5 is base64 and case-sensitive
- if customerKey.KeyMD5 != sseKeyMD5 {
- // For GET/HEAD requests, AWS S3 returns 403 Forbidden for a key mismatch.
- s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
- return http.StatusForbidden, 0
- }
- // SSE-C encrypted objects support HTTP Range requests
- // The IV is stored in metadata and CTR mode allows seeking to any offset
- // Range requests will be handled by the filer layer with proper offset-based decryption
- // Check if this is a chunked or small content SSE-C object
- bucket, object := s3_constants.GetBucketAndObject(r)
- objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
- if entry, err := s3a.getEntry("", objectPath); err == nil {
- // Check for SSE-C chunks
- sseCChunks := 0
- for _, chunk := range entry.GetChunks() {
- if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
- sseCChunks++
- }
- }
- if sseCChunks >= 1 {
- // Handle chunked SSE-C objects - each chunk needs independent decryption
- multipartReader, decErr := s3a.createMultipartSSECDecryptedReader(r, proxyResponse)
- if decErr != nil {
- glog.Errorf("Failed to create multipart SSE-C decrypted reader: %v", decErr)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
- // Capture existing CORS headers
- capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
- // Copy headers from proxy response
- for k, v := range proxyResponse.Header {
- w.Header()[k] = v
- }
- // Set proper headers for range requests
- rangeHeader := r.Header.Get("Range")
- if rangeHeader != "" {
- // Parse range header (e.g., "bytes=0-99")
- if len(rangeHeader) > 6 && rangeHeader[:6] == "bytes=" {
- rangeSpec := rangeHeader[6:]
- parts := strings.Split(rangeSpec, "-")
- if len(parts) == 2 {
- startOffset, endOffset := int64(0), int64(-1)
- if parts[0] != "" {
- startOffset, _ = strconv.ParseInt(parts[0], 10, 64)
- }
- if parts[1] != "" {
- endOffset, _ = strconv.ParseInt(parts[1], 10, 64)
- }
- if endOffset >= startOffset {
- // Specific range - set proper Content-Length and Content-Range headers
- rangeLength := endOffset - startOffset + 1
- totalSize := proxyResponse.Header.Get("Content-Length")
- w.Header().Set("Content-Length", strconv.FormatInt(rangeLength, 10))
- w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%s", startOffset, endOffset, totalSize))
- // writeFinalResponse will set status to 206 if Content-Range is present
- }
- }
- }
- }
- return writeFinalResponse(w, proxyResponse, multipartReader, capturedCORSHeaders)
- } else if len(entry.GetChunks()) == 0 && len(entry.Content) > 0 {
- // Small content SSE-C object stored directly in entry.Content
- // Fall through to traditional single-object SSE-C handling below
- }
- }
- // Single-part SSE-C object: Get IV from proxy response headers (stored during upload)
- ivBase64 := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEIVHeader)
- if ivBase64 == "" {
- glog.Errorf("SSE-C encrypted single-part object missing IV in metadata")
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
- iv, err := base64.StdEncoding.DecodeString(ivBase64)
- if err != nil {
- glog.Errorf("Failed to decode IV from metadata: %v", err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
- // Create decrypted reader with IV from metadata
- decryptedReader, decErr := CreateSSECDecryptedReader(proxyResponse.Body, customerKey, iv)
- if decErr != nil {
- glog.Errorf("Failed to create SSE-C decrypted reader: %v", decErr)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
- // Capture existing CORS headers that may have been set by middleware
- capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
- // Copy headers from proxy response (excluding body-related headers that might change)
- for k, v := range proxyResponse.Header {
- if k != "Content-Length" && k != "Content-Encoding" {
- w.Header()[k] = v
- }
- }
- // Set correct Content-Length for SSE-C (only for full object requests)
- // With IV stored in metadata, the encrypted length equals the original length
- if proxyResponse.Header.Get("Content-Range") == "" {
- // Full object request: encrypted length equals original length (IV not in stream)
- if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" {
- // Content-Length is already correct since IV is stored in metadata, not in data stream
- w.Header().Set("Content-Length", contentLengthStr)
- }
- }
- // For range requests, let the actual bytes transferred determine the response length
- // Add SSE-C response headers
- w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, sseAlgorithm)
- w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, sseKeyMD5)
- return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders)
- } else {
- // Object is not encrypted, but check if customer provided SSE-C headers unnecessarily
- if customerKey != nil {
- s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyNotNeeded)
- return http.StatusBadRequest, 0
- }
- // Normal pass-through response
- return passThroughResponse(proxyResponse, w)
- }
- }
- // handleSSEResponse handles both SSE-C and SSE-KMS decryption/validation and response processing
- func (s3a *S3ApiServer) handleSSEResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
- // Check what the client is expecting based on request headers
- clientExpectsSSEC := IsSSECRequest(r)
- // Check what the stored object has in headers (may be conflicting after copy)
- kmsMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader)
- sseAlgorithm := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm)
- // Get actual object state by examining chunks (most reliable for cross-encryption)
- bucket, object := s3_constants.GetBucketAndObject(r)
- objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
- actualObjectType := "Unknown"
- if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
- actualObjectType = s3a.detectPrimarySSEType(objectEntry)
- }
- // Route based on ACTUAL object type (from chunks) rather than conflicting headers
- if actualObjectType == s3_constants.SSETypeC && clientExpectsSSEC {
- // Object is SSE-C and client expects SSE-C → SSE-C handler
- return s3a.handleSSECResponse(r, proxyResponse, w)
- } else if actualObjectType == s3_constants.SSETypeKMS && !clientExpectsSSEC {
- // Object is SSE-KMS and client doesn't expect SSE-C → SSE-KMS handler
- return s3a.handleSSEKMSResponse(r, proxyResponse, w, kmsMetadataHeader)
- } else if actualObjectType == "None" && !clientExpectsSSEC {
- // Object is unencrypted and client doesn't expect SSE-C → pass through
- return passThroughResponse(proxyResponse, w)
- } else if actualObjectType == s3_constants.SSETypeC && !clientExpectsSSEC {
- // Object is SSE-C but client doesn't provide SSE-C headers → Error
- s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
- return http.StatusBadRequest, 0
- } else if actualObjectType == s3_constants.SSETypeKMS && clientExpectsSSEC {
- // Object is SSE-KMS but client provides SSE-C headers → Error
- s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
- return http.StatusBadRequest, 0
- } else if actualObjectType == "None" && clientExpectsSSEC {
- // Object is unencrypted but client provides SSE-C headers → Error
- s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
- return http.StatusBadRequest, 0
- }
- // Fallback for edge cases - use original logic with header-based detection
- if clientExpectsSSEC && sseAlgorithm != "" {
- return s3a.handleSSECResponse(r, proxyResponse, w)
- } else if !clientExpectsSSEC && kmsMetadataHeader != "" {
- return s3a.handleSSEKMSResponse(r, proxyResponse, w, kmsMetadataHeader)
- } else {
- return passThroughResponse(proxyResponse, w)
- }
- }
- // handleSSEKMSResponse handles SSE-KMS decryption and response processing
- func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, kmsMetadataHeader string) (statusCode int, bytesTransferred int64) {
- // Deserialize SSE-KMS metadata
- kmsMetadataBytes, err := base64.StdEncoding.DecodeString(kmsMetadataHeader)
- if err != nil {
- glog.Errorf("Failed to decode SSE-KMS metadata: %v", err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
- sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
- if err != nil {
- glog.Errorf("Failed to deserialize SSE-KMS metadata: %v", err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
- // For HEAD requests, we don't need to decrypt the body, just add response headers
- if r.Method == "HEAD" {
- // Capture existing CORS headers that may have been set by middleware
- capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
- // Copy headers from proxy response
- for k, v := range proxyResponse.Header {
- w.Header()[k] = v
- }
- // Add SSE-KMS response headers
- AddSSEKMSResponseHeaders(w, sseKMSKey)
- return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders)
- }
- // For GET requests, check if this is a multipart SSE-KMS object
- // We need to check the object structure to determine if it's multipart encrypted
- isMultipartSSEKMS := false
- if sseKMSKey != nil {
- // Get the object entry to check chunk structure
- bucket, object := s3_constants.GetBucketAndObject(r)
- objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
- if entry, err := s3a.getEntry("", objectPath); err == nil {
- // Check for multipart SSE-KMS
- sseKMSChunks := 0
- for _, chunk := range entry.GetChunks() {
- if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 {
- sseKMSChunks++
- }
- }
- isMultipartSSEKMS = sseKMSChunks > 1
- glog.Infof("SSE-KMS object detection: chunks=%d, sseKMSChunks=%d, isMultipartSSEKMS=%t",
- len(entry.GetChunks()), sseKMSChunks, isMultipartSSEKMS)
- }
- }
- var decryptedReader io.Reader
- if isMultipartSSEKMS {
- // Handle multipart SSE-KMS objects - each chunk needs independent decryption
- multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse)
- if decErr != nil {
- glog.Errorf("Failed to create multipart SSE-KMS decrypted reader: %v", decErr)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
- decryptedReader = multipartReader
- glog.V(3).Infof("Using multipart SSE-KMS decryption for object")
- } else {
- // Handle single-part SSE-KMS objects
- singlePartReader, decErr := CreateSSEKMSDecryptedReader(proxyResponse.Body, sseKMSKey)
- if decErr != nil {
- glog.Errorf("Failed to create SSE-KMS decrypted reader: %v", decErr)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
- decryptedReader = singlePartReader
- glog.V(3).Infof("Using single-part SSE-KMS decryption for object")
- }
- // Capture existing CORS headers that may have been set by middleware
- capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
- // Copy headers from proxy response (excluding body-related headers that might change)
- for k, v := range proxyResponse.Header {
- if k != "Content-Length" && k != "Content-Encoding" {
- w.Header()[k] = v
- }
- }
- // Set correct Content-Length for SSE-KMS
- if proxyResponse.Header.Get("Content-Range") == "" {
- // For full object requests, encrypted length equals original length
- if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" {
- w.Header().Set("Content-Length", contentLengthStr)
- }
- }
- // Add SSE-KMS response headers
- AddSSEKMSResponseHeaders(w, sseKMSKey)
- return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders)
- }
- // addObjectLockHeadersToResponse extracts object lock metadata from entry Extended attributes
- // and adds the appropriate S3 headers to the response
- func (s3a *S3ApiServer) addObjectLockHeadersToResponse(w http.ResponseWriter, entry *filer_pb.Entry) {
- if entry == nil || entry.Extended == nil {
- return
- }
- // Check if this entry has any object lock metadata (indicating it's from an object lock enabled bucket)
- hasObjectLockMode := false
- hasRetentionDate := false
- // Add object lock mode header if present
- if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists && len(modeBytes) > 0 {
- w.Header().Set(s3_constants.AmzObjectLockMode, string(modeBytes))
- hasObjectLockMode = true
- }
- // Add retention until date header if present
- if dateBytes, exists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; exists && len(dateBytes) > 0 {
- dateStr := string(dateBytes)
- // Convert Unix timestamp to ISO8601 format for S3 compatibility
- if timestamp, err := strconv.ParseInt(dateStr, 10, 64); err == nil {
- retainUntilDate := time.Unix(timestamp, 0).UTC()
- w.Header().Set(s3_constants.AmzObjectLockRetainUntilDate, retainUntilDate.Format(time.RFC3339))
- hasRetentionDate = true
- } else {
- glog.Errorf("addObjectLockHeadersToResponse: failed to parse retention until date from stored metadata (dateStr: %s): %v", dateStr, err)
- }
- }
- // Add legal hold header - AWS S3 behavior: always include legal hold for object lock enabled buckets
- if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists && len(legalHoldBytes) > 0 {
- // Return stored S3 standard "ON"/"OFF" values directly
- w.Header().Set(s3_constants.AmzObjectLockLegalHold, string(legalHoldBytes))
- } else if hasObjectLockMode || hasRetentionDate {
- // If this entry has object lock metadata (indicating object lock enabled bucket)
- // but no legal hold specifically set, default to "OFF" as per AWS S3 behavior
- w.Header().Set(s3_constants.AmzObjectLockLegalHold, s3_constants.LegalHoldOff)
- }
- }
- // addSSEHeadersToResponse converts stored SSE metadata from entry.Extended to HTTP response headers
- // Uses intelligent prioritization: only set headers for the PRIMARY encryption type to avoid conflicts
- func (s3a *S3ApiServer) addSSEHeadersToResponse(proxyResponse *http.Response, entry *filer_pb.Entry) {
- if entry == nil || entry.Extended == nil {
- return
- }
- // Determine the primary encryption type by examining chunks (most reliable)
- primarySSEType := s3a.detectPrimarySSEType(entry)
- // Only set headers for the PRIMARY encryption type
- switch primarySSEType {
- case s3_constants.SSETypeC:
- // Add only SSE-C headers
- if algorithmBytes, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; exists && len(algorithmBytes) > 0 {
- proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, string(algorithmBytes))
- }
- if keyMD5Bytes, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; exists && len(keyMD5Bytes) > 0 {
- proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, string(keyMD5Bytes))
- }
- if ivBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEIV]; exists && len(ivBytes) > 0 {
- ivBase64 := base64.StdEncoding.EncodeToString(ivBytes)
- proxyResponse.Header.Set(s3_constants.SeaweedFSSSEIVHeader, ivBase64)
- }
- case s3_constants.SSETypeKMS:
- // Add only SSE-KMS headers
- if sseAlgorithm, exists := entry.Extended[s3_constants.AmzServerSideEncryption]; exists && len(sseAlgorithm) > 0 {
- proxyResponse.Header.Set(s3_constants.AmzServerSideEncryption, string(sseAlgorithm))
- }
- if kmsKeyID, exists := entry.Extended[s3_constants.AmzServerSideEncryptionAwsKmsKeyId]; exists && len(kmsKeyID) > 0 {
- proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, string(kmsKeyID))
- }
- default:
- // Unencrypted or unknown - don't set any SSE headers
- }
- glog.V(3).Infof("addSSEHeadersToResponse: processed %d extended metadata entries", len(entry.Extended))
- }
- // detectPrimarySSEType determines the primary SSE type by examining chunk metadata
- func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string {
- if len(entry.GetChunks()) == 0 {
- // No chunks - check object-level metadata only (single objects or smallContent)
- hasSSEC := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] != nil
- hasSSEKMS := entry.Extended[s3_constants.AmzServerSideEncryption] != nil
- if hasSSEC && !hasSSEKMS {
- return s3_constants.SSETypeC
- } else if hasSSEKMS && !hasSSEC {
- return s3_constants.SSETypeKMS
- } else if hasSSEC && hasSSEKMS {
- // Both present - this should only happen during cross-encryption copies
- // Use content to determine actual encryption state
- if len(entry.Content) > 0 {
- // smallContent - check if it's encrypted (heuristic: random-looking data)
- return s3_constants.SSETypeC // Default to SSE-C for mixed case
- } else {
- // No content, both headers - default to SSE-C
- return s3_constants.SSETypeC
- }
- }
- return "None"
- }
- // Count chunk types to determine primary (multipart objects)
- ssecChunks := 0
- ssekmsChunks := 0
- for _, chunk := range entry.GetChunks() {
- switch chunk.GetSseType() {
- case filer_pb.SSEType_SSE_C:
- ssecChunks++
- case filer_pb.SSEType_SSE_KMS:
- ssekmsChunks++
- }
- }
- // Primary type is the one with more chunks
- if ssecChunks > ssekmsChunks {
- return s3_constants.SSETypeC
- } else if ssekmsChunks > ssecChunks {
- return s3_constants.SSETypeKMS
- } else if ssecChunks > 0 {
- // Equal number, prefer SSE-C (shouldn't happen in practice)
- return s3_constants.SSETypeC
- }
- return "None"
- }
- // createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects
- func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) {
- // Get the object path from the request
- bucket, object := s3_constants.GetBucketAndObject(r)
- objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
- // Get the object entry from filer to access chunk information
- entry, err := s3a.getEntry("", objectPath)
- if err != nil {
- return nil, fmt.Errorf("failed to get object entry for multipart SSE-KMS decryption: %v", err)
- }
- // Sort chunks by offset to ensure correct order
- chunks := entry.GetChunks()
- sort.Slice(chunks, func(i, j int) bool {
- return chunks[i].GetOffset() < chunks[j].GetOffset()
- })
- // Create readers for each chunk, decrypting them independently
- var readers []io.Reader
- for i, chunk := range chunks {
- glog.Infof("Processing chunk %d/%d: fileId=%s, offset=%d, size=%d, sse_type=%d",
- i+1, len(entry.GetChunks()), chunk.GetFileIdString(), chunk.GetOffset(), chunk.GetSize(), chunk.GetSseType())
- // Get this chunk's encrypted data
- chunkReader, err := s3a.createEncryptedChunkReader(chunk)
- if err != nil {
- return nil, fmt.Errorf("failed to create chunk reader: %v", err)
- }
- // Get SSE-KMS metadata for this chunk
- var chunkSSEKMSKey *SSEKMSKey
- // Check if this chunk has per-chunk SSE-KMS metadata (new architecture)
- if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 {
- // Use the per-chunk SSE-KMS metadata
- kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata())
- if err != nil {
- glog.Errorf("Failed to deserialize per-chunk SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err)
- } else {
- // ChunkOffset is already set from the stored metadata (PartOffset)
- chunkSSEKMSKey = kmsKey
- glog.Infof("Using per-chunk SSE-KMS metadata for chunk %s: keyID=%s, IV=%x, partOffset=%d",
- chunk.GetFileIdString(), kmsKey.KeyID, kmsKey.IV[:8], kmsKey.ChunkOffset)
- }
- }
- // Fallback to object-level metadata (legacy support)
- if chunkSSEKMSKey == nil {
- objectMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader)
- if objectMetadataHeader != "" {
- kmsMetadataBytes, decodeErr := base64.StdEncoding.DecodeString(objectMetadataHeader)
- if decodeErr == nil {
- kmsKey, _ := DeserializeSSEKMSMetadata(kmsMetadataBytes)
- if kmsKey != nil {
- // For object-level metadata (legacy), use absolute file offset as fallback
- kmsKey.ChunkOffset = chunk.GetOffset()
- chunkSSEKMSKey = kmsKey
- }
- glog.Infof("Using fallback object-level SSE-KMS metadata for chunk %s with offset %d", chunk.GetFileIdString(), chunk.GetOffset())
- }
- }
- }
- if chunkSSEKMSKey == nil {
- return nil, fmt.Errorf("no SSE-KMS metadata found for chunk %s in multipart object", chunk.GetFileIdString())
- }
- // Create decrypted reader for this chunk
- decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, chunkSSEKMSKey)
- if decErr != nil {
- chunkReader.Close() // Close the chunk reader if decryption fails
- return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr)
- }
- // Use the streaming decrypted reader directly instead of reading into memory
- readers = append(readers, decryptedChunkReader)
- glog.V(4).Infof("Added streaming decrypted reader for chunk %s in multipart SSE-KMS object", chunk.GetFileIdString())
- }
- // Combine all decrypted chunk readers into a single stream with proper resource management
- multiReader := NewMultipartSSEReader(readers)
- glog.V(3).Infof("Created multipart SSE-KMS decrypted reader with %d chunks", len(readers))
- return multiReader, nil
- }
- // createEncryptedChunkReader creates a reader for a single encrypted chunk
- func (s3a *S3ApiServer) createEncryptedChunkReader(chunk *filer_pb.FileChunk) (io.ReadCloser, error) {
- // Get chunk URL
- srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString())
- if err != nil {
- return nil, fmt.Errorf("lookup volume URL for chunk %s: %v", chunk.GetFileIdString(), err)
- }
- // Create HTTP request for chunk data
- req, err := http.NewRequest("GET", srcUrl, nil)
- if err != nil {
- return nil, fmt.Errorf("create HTTP request for chunk: %v", err)
- }
- // Execute request
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- return nil, fmt.Errorf("execute HTTP request for chunk: %v", err)
- }
- if resp.StatusCode != http.StatusOK {
- resp.Body.Close()
- return nil, fmt.Errorf("HTTP request for chunk failed: %d", resp.StatusCode)
- }
- return resp.Body, nil
- }
- // MultipartSSEReader wraps multiple readers and ensures all underlying readers are properly closed
- type MultipartSSEReader struct {
- multiReader io.Reader
- readers []io.Reader
- }
- // SSERangeReader applies range logic to an underlying reader
- type SSERangeReader struct {
- reader io.Reader
- offset int64 // bytes to skip from the beginning
- remaining int64 // bytes remaining to read (-1 for unlimited)
- skipped int64 // bytes already skipped
- }
- // NewMultipartSSEReader creates a new multipart reader that can properly close all underlying readers
- func NewMultipartSSEReader(readers []io.Reader) *MultipartSSEReader {
- return &MultipartSSEReader{
- multiReader: io.MultiReader(readers...),
- readers: readers,
- }
- }
- // Read implements the io.Reader interface
- func (m *MultipartSSEReader) Read(p []byte) (n int, err error) {
- return m.multiReader.Read(p)
- }
- // Close implements the io.Closer interface and closes all underlying readers that support closing
- func (m *MultipartSSEReader) Close() error {
- var lastErr error
- for i, reader := range m.readers {
- if closer, ok := reader.(io.Closer); ok {
- if err := closer.Close(); err != nil {
- glog.V(2).Infof("Error closing reader %d: %v", i, err)
- lastErr = err // Keep track of the last error, but continue closing others
- }
- }
- }
- return lastErr
- }
- // Read implements the io.Reader interface for SSERangeReader
- func (r *SSERangeReader) Read(p []byte) (n int, err error) {
- // If we need to skip bytes and haven't skipped enough yet
- if r.skipped < r.offset {
- skipNeeded := r.offset - r.skipped
- skipBuf := make([]byte, min(int64(len(p)), skipNeeded))
- skipRead, skipErr := r.reader.Read(skipBuf)
- r.skipped += int64(skipRead)
- if skipErr != nil {
- return 0, skipErr
- }
- // If we still need to skip more, recurse
- if r.skipped < r.offset {
- return r.Read(p)
- }
- }
- // If we have a remaining limit and it's reached
- if r.remaining == 0 {
- return 0, io.EOF
- }
- // Calculate how much to read
- readSize := len(p)
- if r.remaining > 0 && int64(readSize) > r.remaining {
- readSize = int(r.remaining)
- }
- // Read the data
- n, err = r.reader.Read(p[:readSize])
- if r.remaining > 0 {
- r.remaining -= int64(n)
- }
- return n, err
- }
- // createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects
- // Each chunk has its own IV and encryption key from the original multipart parts
- func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) {
- // Parse SSE-C headers from the request for decryption key
- customerKey, err := ParseSSECHeaders(r)
- if err != nil {
- return nil, fmt.Errorf("invalid SSE-C headers for multipart decryption: %v", err)
- }
- // Get the object path from the request
- bucket, object := s3_constants.GetBucketAndObject(r)
- objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
- // Get the object entry from filer to access chunk information
- entry, err := s3a.getEntry("", objectPath)
- if err != nil {
- return nil, fmt.Errorf("failed to get object entry for multipart SSE-C decryption: %v", err)
- }
- // Sort chunks by offset to ensure correct order
- chunks := entry.GetChunks()
- sort.Slice(chunks, func(i, j int) bool {
- return chunks[i].GetOffset() < chunks[j].GetOffset()
- })
- // Check for Range header to optimize chunk processing
- var startOffset, endOffset int64 = 0, -1
- rangeHeader := r.Header.Get("Range")
- if rangeHeader != "" {
- // Parse range header (e.g., "bytes=0-99")
- if len(rangeHeader) > 6 && rangeHeader[:6] == "bytes=" {
- rangeSpec := rangeHeader[6:]
- parts := strings.Split(rangeSpec, "-")
- if len(parts) == 2 {
- if parts[0] != "" {
- startOffset, _ = strconv.ParseInt(parts[0], 10, 64)
- }
- if parts[1] != "" {
- endOffset, _ = strconv.ParseInt(parts[1], 10, 64)
- }
- }
- }
- }
- // Filter chunks to only those needed for the range request
- var neededChunks []*filer_pb.FileChunk
- for _, chunk := range chunks {
- chunkStart := chunk.GetOffset()
- chunkEnd := chunkStart + int64(chunk.GetSize()) - 1
- // Check if this chunk overlaps with the requested range
- if endOffset == -1 {
- // No end specified, take all chunks from startOffset
- if chunkEnd >= startOffset {
- neededChunks = append(neededChunks, chunk)
- }
- } else {
- // Specific range: check for overlap
- if chunkStart <= endOffset && chunkEnd >= startOffset {
- neededChunks = append(neededChunks, chunk)
- }
- }
- }
- // Create readers for only the needed chunks
- var readers []io.Reader
- for _, chunk := range neededChunks {
- // Get this chunk's encrypted data
- chunkReader, err := s3a.createEncryptedChunkReader(chunk)
- if err != nil {
- return nil, fmt.Errorf("failed to create chunk reader: %v", err)
- }
- if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
- // For SSE-C chunks, extract the IV from the stored per-chunk metadata (unified approach)
- if len(chunk.GetSseMetadata()) > 0 {
- // Deserialize the SSE-C metadata stored in the unified metadata field
- ssecMetadata, decErr := DeserializeSSECMetadata(chunk.GetSseMetadata())
- if decErr != nil {
- return nil, fmt.Errorf("failed to deserialize SSE-C metadata for chunk %s: %v", chunk.GetFileIdString(), decErr)
- }
- // Decode the IV from the metadata
- iv, ivErr := base64.StdEncoding.DecodeString(ssecMetadata.IV)
- if ivErr != nil {
- return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), ivErr)
- }
- // Calculate the correct IV for this chunk using within-part offset
- var chunkIV []byte
- if ssecMetadata.PartOffset > 0 {
- chunkIV = calculateIVWithOffset(iv, ssecMetadata.PartOffset)
- } else {
- chunkIV = iv
- }
- decryptedReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV)
- if decErr != nil {
- return nil, fmt.Errorf("failed to create SSE-C decrypted reader for chunk %s: %v", chunk.GetFileIdString(), decErr)
- }
- readers = append(readers, decryptedReader)
- glog.Infof("Created SSE-C decrypted reader for chunk %s using stored metadata", chunk.GetFileIdString())
- } else {
- return nil, fmt.Errorf("SSE-C chunk %s missing required metadata", chunk.GetFileIdString())
- }
- } else {
- // Non-SSE-C chunk, use as-is
- readers = append(readers, chunkReader)
- }
- }
- multiReader := NewMultipartSSEReader(readers)
- // Apply range logic if a range was requested
- if rangeHeader != "" && startOffset >= 0 {
- if endOffset == -1 {
- // Open-ended range (e.g., "bytes=100-")
- return &SSERangeReader{
- reader: multiReader,
- offset: startOffset,
- remaining: -1, // Read until EOF
- }, nil
- } else {
- // Specific range (e.g., "bytes=0-99")
- rangeLength := endOffset - startOffset + 1
- return &SSERangeReader{
- reader: multiReader,
- offset: startOffset,
- remaining: rangeLength,
- }, nil
- }
- }
- return multiReader, nil
- }
|