| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301 |
- package s3api
- import (
- "crypto/md5"
- "encoding/base64"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "net/http"
- "strconv"
- "strings"
- "time"
- "github.com/pquerna/cachecontrol/cacheobject"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
- "github.com/seaweedfs/seaweedfs/weed/security"
- weed_server "github.com/seaweedfs/seaweedfs/weed/server"
- stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
- "github.com/seaweedfs/seaweedfs/weed/util/constants"
- )
- // Object lock validation errors
- var (
- ErrObjectLockVersioningRequired = errors.New("object lock headers can only be used on versioned buckets")
- ErrInvalidObjectLockMode = errors.New("invalid object lock mode")
- ErrInvalidLegalHoldStatus = errors.New("invalid legal hold status")
- ErrInvalidRetentionDateFormat = errors.New("invalid retention until date format")
- ErrRetentionDateMustBeFuture = errors.New("retain until date must be in the future")
- ErrObjectLockModeRequiresDate = errors.New("object lock mode requires retention until date")
- ErrRetentionDateRequiresMode = errors.New("retention until date requires object lock mode")
- ErrGovernanceBypassVersioningRequired = errors.New("governance bypass header can only be used on versioned buckets")
- ErrInvalidObjectLockDuration = errors.New("object lock duration must be greater than 0 days")
- ErrObjectLockDurationExceeded = errors.New("object lock duration exceeds maximum allowed days")
- ErrObjectLockConfigurationMissingEnabled = errors.New("object lock configuration must specify ObjectLockEnabled")
- ErrInvalidObjectLockEnabledValue = errors.New("invalid object lock enabled value")
- ErrRuleMissingDefaultRetention = errors.New("rule configuration must specify DefaultRetention")
- ErrDefaultRetentionMissingMode = errors.New("default retention must specify Mode")
- ErrInvalidDefaultRetentionMode = errors.New("invalid default retention mode")
- ErrDefaultRetentionMissingPeriod = errors.New("default retention must specify either Days or Years")
- ErrDefaultRetentionBothDaysAndYears = errors.New("default retention cannot specify both Days and Years")
- ErrDefaultRetentionDaysOutOfRange = errors.New("default retention days must be between 0 and 36500")
- ErrDefaultRetentionYearsOutOfRange = errors.New("default retention years must be between 0 and 100")
- )
- // hasExplicitEncryption checks if any explicit encryption was provided in the request.
- // This helper improves readability and makes the encryption check condition more explicit.
- func hasExplicitEncryption(customerKey *SSECustomerKey, sseKMSKey *SSEKMSKey, sseS3Key *SSES3Key) bool {
- return customerKey != nil || sseKMSKey != nil || sseS3Key != nil
- }
- // BucketDefaultEncryptionResult holds the result of bucket default encryption processing
- type BucketDefaultEncryptionResult struct {
- DataReader io.Reader
- SSES3Key *SSES3Key
- SSEKMSKey *SSEKMSKey
- }
- func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
- // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
- bucket, object := s3_constants.GetBucketAndObject(r)
- authHeader := r.Header.Get("Authorization")
- authPreview := authHeader
- if len(authHeader) > 50 {
- authPreview = authHeader[:50] + "..."
- }
- glog.V(0).Infof("PutObjectHandler: Starting PUT %s/%s (Auth: %s)", bucket, object, authPreview)
- glog.V(3).Infof("PutObjectHandler %s %s", bucket, object)
- _, err := validateContentMd5(r.Header)
- if err != nil {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest)
- return
- }
- // Check conditional headers
- if errCode := s3a.checkConditionalHeaders(r, bucket, object); errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, r, errCode)
- return
- }
- if r.Header.Get("Cache-Control") != "" {
- if _, err = cacheobject.ParseRequestCacheControl(r.Header.Get("Cache-Control")); err != nil {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest)
- return
- }
- }
- if r.Header.Get("Expires") != "" {
- if _, err = time.Parse(http.TimeFormat, r.Header.Get("Expires")); err != nil {
- s3err.WriteErrorResponse(w, r, s3err.ErrMalformedDate)
- return
- }
- }
- dataReader, s3ErrCode := getRequestDataReader(s3a, r)
- if s3ErrCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, r, s3ErrCode)
- return
- }
- defer dataReader.Close()
- objectContentType := r.Header.Get("Content-Type")
- if strings.HasSuffix(object, "/") && r.ContentLength <= 1024 {
- if err := s3a.mkdir(
- s3a.option.BucketsPath, bucket+strings.TrimSuffix(object, "/"),
- func(entry *filer_pb.Entry) {
- if objectContentType == "" {
- objectContentType = s3_constants.FolderMimeType
- }
- if r.ContentLength > 0 {
- entry.Content, _ = io.ReadAll(r.Body)
- }
- entry.Attributes.Mime = objectContentType
- // Set object owner for directory objects (same as regular objects)
- s3a.setObjectOwnerFromRequest(r, entry)
- }); err != nil {
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return
- }
- } else {
- // Get detailed versioning state for the bucket
- versioningState, err := s3a.getVersioningState(bucket)
- if err != nil {
- if errors.Is(err, filer_pb.ErrNotFound) {
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
- return
- }
- glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return
- }
- versioningEnabled := (versioningState == s3_constants.VersioningEnabled)
- versioningConfigured := (versioningState != "")
- glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningState=%s", bucket, object, versioningState)
- // Validate object lock headers before processing
- if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil {
- glog.V(2).Infof("PutObjectHandler: object lock header validation failed for bucket %s, object %s: %v", bucket, object, err)
- s3err.WriteErrorResponse(w, r, mapValidationErrorToS3Error(err))
- return
- }
- // For non-versioned buckets, check if existing object has object lock protections
- // that would prevent overwrite (PUT operations overwrite existing objects in non-versioned buckets)
- if !versioningConfigured {
- governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
- if err := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); err != nil {
- glog.V(2).Infof("PutObjectHandler: object lock permissions check failed for %s/%s: %v", bucket, object, err)
- s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
- return
- }
- }
- if versioningState == s3_constants.VersioningEnabled {
- // Handle enabled versioning - create new versions with real version IDs
- glog.V(1).Infof("PutObjectHandler: using versioned PUT for %s/%s", bucket, object)
- versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
- if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, r, errCode)
- return
- }
- // Set version ID in response header
- if versionId != "" {
- w.Header().Set("x-amz-version-id", versionId)
- }
- // Set ETag in response
- setEtag(w, etag)
- } else if versioningState == s3_constants.VersioningSuspended {
- // Handle suspended versioning - overwrite with "null" version ID but preserve existing versions
- glog.V(1).Infof("PutObjectHandler: using suspended versioning PUT for %s/%s", bucket, object)
- etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
- if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, r, errCode)
- return
- }
- // Note: Suspended versioning should NOT return x-amz-version-id header according to AWS S3 spec
- // The object is stored with "null" version internally but no version header is returned
- // Set ETag in response
- setEtag(w, etag)
- } else {
- // Handle regular PUT (never configured versioning)
- glog.V(1).Infof("PutObjectHandler: using regular PUT for %s/%s", bucket, object)
- uploadUrl := s3a.toFilerUrl(bucket, object)
- if objectContentType == "" {
- dataReader = mimeDetect(r, dataReader)
- }
- etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, 1)
- if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, r, errCode)
- return
- }
- // No version ID header for never-configured versioning
- setEtag(w, etag)
- // Set SSE response headers based on encryption type used
- if sseType == s3_constants.SSETypeS3 {
- w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
- }
- }
- }
- stats_collect.RecordBucketActiveTime(bucket)
- stats_collect.S3UploadedObjectsCounter.WithLabelValues(bucket).Inc()
- writeSuccessResponseEmpty(w, r)
- }
- func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseType string) {
- // Calculate unique offset for each part to prevent IV reuse in multipart uploads
- // This is critical for CTR mode encryption security
- partOffset := calculatePartOffset(partNumber)
- // Handle all SSE encryption types in a unified manner to eliminate repetitive dataReader assignments
- sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset)
- if sseErrorCode != s3err.ErrNone {
- return "", sseErrorCode, ""
- }
- // Extract results from unified SSE handling
- dataReader = sseResult.DataReader
- customerKey := sseResult.CustomerKey
- sseIV := sseResult.SSEIV
- sseKMSKey := sseResult.SSEKMSKey
- sseKMSMetadata := sseResult.SSEKMSMetadata
- sseS3Key := sseResult.SSES3Key
- sseS3Metadata := sseResult.SSES3Metadata
- // Apply bucket default encryption if no explicit encryption was provided
- // This implements AWS S3 behavior where bucket default encryption automatically applies
- if !hasExplicitEncryption(customerKey, sseKMSKey, sseS3Key) {
- glog.V(4).Infof("putToFiler: no explicit encryption detected, checking for bucket default encryption")
- // Apply bucket default encryption and get the result
- encryptionResult, applyErr := s3a.applyBucketDefaultEncryption(bucket, r, dataReader)
- if applyErr != nil {
- glog.Errorf("Failed to apply bucket default encryption: %v", applyErr)
- return "", s3err.ErrInternalError, ""
- }
- // Update variables based on the result
- dataReader = encryptionResult.DataReader
- sseS3Key = encryptionResult.SSES3Key
- sseKMSKey = encryptionResult.SSEKMSKey
- // If SSE-S3 was applied by bucket default, prepare metadata (if not already done)
- if sseS3Key != nil && len(sseS3Metadata) == 0 {
- var metaErr error
- sseS3Metadata, metaErr = SerializeSSES3Metadata(sseS3Key)
- if metaErr != nil {
- glog.Errorf("Failed to serialize SSE-S3 metadata for bucket default encryption: %v", metaErr)
- return "", s3err.ErrInternalError, ""
- }
- }
- } else {
- glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption")
- }
- hash := md5.New()
- var body = io.TeeReader(dataReader, hash)
- proxyReq, err := http.NewRequest(http.MethodPut, uploadUrl, body)
- if err != nil {
- glog.Errorf("NewRequest %s: %v", uploadUrl, err)
- return "", s3err.ErrInternalError, ""
- }
- proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
- if destination != "" {
- proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination)
- }
- if s3a.option.FilerGroup != "" {
- query := proxyReq.URL.Query()
- query.Add("collection", s3a.getCollectionName(bucket))
- proxyReq.URL.RawQuery = query.Encode()
- }
- for header, values := range r.Header {
- for _, value := range values {
- proxyReq.Header.Add(header, value)
- }
- }
- // Set object owner header for filer to extract
- amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
- if amzAccountId != "" {
- proxyReq.Header.Set(s3_constants.ExtAmzOwnerKey, amzAccountId)
- glog.V(2).Infof("putToFiler: setting owner header %s for object %s", amzAccountId, uploadUrl)
- }
- // Set SSE-C metadata headers for the filer if encryption was applied
- if customerKey != nil && len(sseIV) > 0 {
- proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, "AES256")
- proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, customerKey.KeyMD5)
- // Store IV in a custom header that the filer can use to store in entry metadata
- proxyReq.Header.Set(s3_constants.SeaweedFSSSEIVHeader, base64.StdEncoding.EncodeToString(sseIV))
- }
- // Set SSE-KMS metadata headers for the filer if KMS encryption was applied
- if sseKMSKey != nil {
- // Use already-serialized SSE-KMS metadata from helper function
- // Store serialized KMS metadata in a custom header that the filer can use
- proxyReq.Header.Set(s3_constants.SeaweedFSSSEKMSKeyHeader, base64.StdEncoding.EncodeToString(sseKMSMetadata))
- glog.V(3).Infof("putToFiler: storing SSE-KMS metadata for object %s with keyID %s", uploadUrl, sseKMSKey.KeyID)
- } else {
- glog.V(4).Infof("putToFiler: no SSE-KMS encryption detected")
- }
- // Set SSE-S3 metadata headers for the filer if S3 encryption was applied
- if sseS3Key != nil && len(sseS3Metadata) > 0 {
- // Store serialized S3 metadata in a custom header that the filer can use
- proxyReq.Header.Set(s3_constants.SeaweedFSSSES3Key, base64.StdEncoding.EncodeToString(sseS3Metadata))
- glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", uploadUrl, sseS3Key.KeyID)
- }
- // ensure that the Authorization header is overriding any previous
- // Authorization header which might be already present in proxyReq
- s3a.maybeAddFilerJwtAuthorization(proxyReq, true)
- resp, postErr := s3a.client.Do(proxyReq)
- if postErr != nil {
- glog.Errorf("post to filer: %v", postErr)
- if strings.Contains(postErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
- return "", s3err.ErrInvalidDigest, ""
- }
- return "", s3err.ErrInternalError, ""
- }
- defer resp.Body.Close()
- etag = fmt.Sprintf("%x", hash.Sum(nil))
- resp_body, ra_err := io.ReadAll(resp.Body)
- if ra_err != nil {
- glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err)
- return etag, s3err.ErrInternalError, ""
- }
- var ret weed_server.FilerPostResult
- unmarshal_err := json.Unmarshal(resp_body, &ret)
- if unmarshal_err != nil {
- glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
- return "", s3err.ErrInternalError, ""
- }
- if ret.Error != "" {
- glog.Errorf("upload to filer error: %v", ret.Error)
- return "", filerErrorToS3Error(ret.Error), ""
- }
- BucketTrafficReceived(ret.Size, r)
- // Return the SSE type determined by the unified handler
- return etag, s3err.ErrNone, sseResult.SSEType
- }
- func setEtag(w http.ResponseWriter, etag string) {
- if etag != "" {
- if strings.HasPrefix(etag, "\"") {
- w.Header()["ETag"] = []string{etag}
- } else {
- w.Header()["ETag"] = []string{"\"" + etag + "\""}
- }
- }
- }
- func filerErrorToS3Error(errString string) s3err.ErrorCode {
- switch {
- case errString == constants.ErrMsgBadDigest:
- return s3err.ErrBadDigest
- case strings.Contains(errString, "context canceled") || strings.Contains(errString, "code = Canceled"):
- // Client canceled the request, return client error not server error
- return s3err.ErrInvalidRequest
- case strings.HasPrefix(errString, "existing ") && strings.HasSuffix(errString, "is a directory"):
- return s3err.ErrExistingObjectIsDirectory
- case strings.HasSuffix(errString, "is a file"):
- return s3err.ErrExistingObjectIsFile
- default:
- return s3err.ErrInternalError
- }
- }
- func (s3a *S3ApiServer) maybeAddFilerJwtAuthorization(r *http.Request, isWrite bool) {
- encodedJwt := s3a.maybeGetFilerJwtAuthorizationToken(isWrite)
- if encodedJwt == "" {
- return
- }
- r.Header.Set("Authorization", "BEARER "+string(encodedJwt))
- }
- func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string {
- var encodedJwt security.EncodedJwt
- if isWrite {
- encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.SigningKey, s3a.filerGuard.ExpiresAfterSec)
- } else {
- encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec)
- }
- return string(encodedJwt)
- }
- // setObjectOwnerFromRequest sets the object owner metadata based on the authenticated user
- func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_pb.Entry) {
- amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
- if amzAccountId != "" {
- if entry.Extended == nil {
- entry.Extended = make(map[string][]byte)
- }
- entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
- glog.V(2).Infof("setObjectOwnerFromRequest: set object owner to %s", amzAccountId)
- }
- }
- // putVersionedObject handles PUT operations for versioned buckets using the new layout
- // where all versions (including latest) are stored in the .versions directory
- func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) {
- // For suspended versioning, store as regular object (version ID "null") but preserve existing versions
- glog.V(2).Infof("putSuspendedVersioningObject: creating null version for %s/%s", bucket, object)
- uploadUrl := s3a.toFilerUrl(bucket, object)
- if objectContentType == "" {
- dataReader = mimeDetect(r, dataReader)
- }
- etag, errCode, _ = s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, 1)
- if errCode != s3err.ErrNone {
- glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
- return "", errCode
- }
- // Get the uploaded entry to add version metadata indicating this is "null" version
- bucketDir := s3a.option.BucketsPath + "/" + bucket
- entry, err := s3a.getEntry(bucketDir, object)
- if err != nil {
- glog.Errorf("putSuspendedVersioningObject: failed to get object entry: %v", err)
- return "", s3err.ErrInternalError
- }
- // Add metadata to indicate this is a "null" version for suspended versioning
- if entry.Extended == nil {
- entry.Extended = make(map[string][]byte)
- }
- entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
- // Set object owner for suspended versioning objects
- s3a.setObjectOwnerFromRequest(r, entry)
- // Extract and store object lock metadata from request headers (if any)
- if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil {
- glog.Errorf("putSuspendedVersioningObject: failed to extract object lock metadata: %v", err)
- return "", s3err.ErrInvalidRequest
- }
- // Update the entry with metadata
- err = s3a.mkFile(bucketDir, object, entry.Chunks, func(updatedEntry *filer_pb.Entry) {
- updatedEntry.Extended = entry.Extended
- updatedEntry.Attributes = entry.Attributes
- updatedEntry.Chunks = entry.Chunks
- })
- if err != nil {
- glog.Errorf("putSuspendedVersioningObject: failed to update object metadata: %v", err)
- return "", s3err.ErrInternalError
- }
- // Update all existing versions/delete markers to set IsLatest=false since "null" is now latest
- err = s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, object)
- if err != nil {
- glog.Warningf("putSuspendedVersioningObject: failed to update IsLatest flags: %v", err)
- // Don't fail the request, but log the warning
- }
- glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object)
- return etag, s3err.ErrNone
- }
- // updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers
- // when a new "null" version becomes the latest during suspended versioning
- func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object string) error {
- bucketDir := s3a.option.BucketsPath + "/" + bucket
- versionsObjectPath := object + ".versions"
- versionsDir := bucketDir + "/" + versionsObjectPath
- glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: updating flags for %s%s", bucket, object)
- // Check if .versions directory exists
- _, err := s3a.getEntry(bucketDir, versionsObjectPath)
- if err != nil {
- // No .versions directory exists, nothing to update
- glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: no .versions directory for %s%s", bucket, object)
- return nil
- }
- // List all entries in .versions directory
- entries, _, err := s3a.list(versionsDir, "", "", false, 1000)
- if err != nil {
- return fmt.Errorf("failed to list versions directory: %v", err)
- }
- glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: found %d entries to update", len(entries))
- // Update each version/delete marker to set IsLatest=false
- for _, entry := range entries {
- if entry.Extended == nil {
- continue
- }
- // Check if this entry has a version ID (it should be a version or delete marker)
- versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
- if !hasVersionId {
- continue
- }
- versionId := string(versionIdBytes)
- glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: setting IsLatest=false for version %s", versionId)
- // Update the entry to set IsLatest=false (we don't explicitly store this flag,
- // it's determined by comparison with latest version metadata)
- // We need to clear the latest version metadata from the .versions directory
- // so that our getObjectVersionList function will correctly show IsLatest=false
- }
- // Clear the latest version metadata from .versions directory since "null" is now latest
- versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
- if err == nil && versionsEntry.Extended != nil {
- // Remove latest version metadata so all versions show IsLatest=false
- delete(versionsEntry.Extended, s3_constants.ExtLatestVersionIdKey)
- delete(versionsEntry.Extended, s3_constants.ExtLatestVersionFileNameKey)
- // Update the .versions directory entry
- err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
- updatedEntry.Extended = versionsEntry.Extended
- updatedEntry.Attributes = versionsEntry.Attributes
- updatedEntry.Chunks = versionsEntry.Chunks
- })
- if err != nil {
- return fmt.Errorf("failed to update .versions directory metadata: %v", err)
- }
- glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: cleared latest version metadata for %s%s", bucket, object)
- }
- return nil
- }
- func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) {
- // Generate version ID
- versionId = generateVersionId()
- glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s", versionId, bucket, object)
- // Create the version file name
- versionFileName := s3a.getVersionFileName(versionId)
- // Upload directly to the versions directory
- // We need to construct the object path relative to the bucket
- versionObjectPath := object + ".versions/" + versionFileName
- versionUploadUrl := s3a.toFilerUrl(bucket, versionObjectPath)
- hash := md5.New()
- var body = io.TeeReader(dataReader, hash)
- if objectContentType == "" {
- body = mimeDetect(r, body)
- }
- glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl)
- etag, errCode, _ = s3a.putToFiler(r, versionUploadUrl, body, "", bucket, 1)
- if errCode != s3err.ErrNone {
- glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
- return "", "", errCode
- }
- // Get the uploaded entry to add versioning metadata
- bucketDir := s3a.option.BucketsPath + "/" + bucket
- versionEntry, err := s3a.getEntry(bucketDir, versionObjectPath)
- if err != nil {
- glog.Errorf("putVersionedObject: failed to get version entry: %v", err)
- return "", "", s3err.ErrInternalError
- }
- // Add versioning metadata to this version
- if versionEntry.Extended == nil {
- versionEntry.Extended = make(map[string][]byte)
- }
- versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
- // Store ETag with quotes for S3 compatibility
- if !strings.HasPrefix(etag, "\"") {
- etag = "\"" + etag + "\""
- }
- versionEntry.Extended[s3_constants.ExtETagKey] = []byte(etag)
- // Set object owner for versioned objects
- s3a.setObjectOwnerFromRequest(r, versionEntry)
- // Extract and store object lock metadata from request headers
- if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil {
- glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err)
- return "", "", s3err.ErrInvalidRequest
- }
- // Update the version entry with metadata
- err = s3a.mkFile(bucketDir, versionObjectPath, versionEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
- updatedEntry.Extended = versionEntry.Extended
- updatedEntry.Attributes = versionEntry.Attributes
- updatedEntry.Chunks = versionEntry.Chunks
- })
- if err != nil {
- glog.Errorf("putVersionedObject: failed to update version metadata: %v", err)
- return "", "", s3err.ErrInternalError
- }
- // Update the .versions directory metadata to indicate this is the latest version
- err = s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName)
- if err != nil {
- glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
- return "", "", s3err.ErrInternalError
- }
- glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s", versionId, bucket, object)
- return versionId, etag, s3err.ErrNone
- }
- // updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version
- func (s3a *S3ApiServer) updateLatestVersionInDirectory(bucket, object, versionId, versionFileName string) error {
- bucketDir := s3a.option.BucketsPath + "/" + bucket
- versionsObjectPath := object + ".versions"
- // Get the current .versions directory entry
- versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
- if err != nil {
- glog.Errorf("updateLatestVersionInDirectory: failed to get .versions entry: %v", err)
- return fmt.Errorf("failed to get .versions entry: %w", err)
- }
- // Add or update the latest version metadata
- if versionsEntry.Extended == nil {
- versionsEntry.Extended = make(map[string][]byte)
- }
- versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(versionId)
- versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(versionFileName)
- // Update the .versions directory entry with metadata
- err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
- updatedEntry.Extended = versionsEntry.Extended
- updatedEntry.Attributes = versionsEntry.Attributes
- updatedEntry.Chunks = versionsEntry.Chunks
- })
- if err != nil {
- glog.Errorf("updateLatestVersionInDirectory: failed to update .versions directory metadata: %v", err)
- return fmt.Errorf("failed to update .versions directory metadata: %w", err)
- }
- return nil
- }
- // extractObjectLockMetadataFromRequest extracts object lock headers from PUT requests
- // and applies bucket default retention if no explicit retention is provided
- func (s3a *S3ApiServer) extractObjectLockMetadataFromRequest(r *http.Request, entry *filer_pb.Entry) error {
- if entry.Extended == nil {
- entry.Extended = make(map[string][]byte)
- }
- // Extract explicit object lock mode (GOVERNANCE or COMPLIANCE)
- explicitMode := r.Header.Get(s3_constants.AmzObjectLockMode)
- if explicitMode != "" {
- entry.Extended[s3_constants.ExtObjectLockModeKey] = []byte(explicitMode)
- glog.V(2).Infof("extractObjectLockMetadataFromRequest: storing explicit object lock mode: %s", explicitMode)
- }
- // Extract explicit retention until date
- explicitRetainUntilDate := r.Header.Get(s3_constants.AmzObjectLockRetainUntilDate)
- if explicitRetainUntilDate != "" {
- // Parse the ISO8601 date and convert to Unix timestamp for storage
- parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate)
- if err != nil {
- glog.Errorf("extractObjectLockMetadataFromRequest: failed to parse retention until date, expected format: %s, error: %v", time.RFC3339, err)
- return ErrInvalidRetentionDateFormat
- }
- entry.Extended[s3_constants.ExtRetentionUntilDateKey] = []byte(strconv.FormatInt(parsedTime.Unix(), 10))
- glog.V(2).Infof("extractObjectLockMetadataFromRequest: storing explicit retention until date (timestamp: %d)", parsedTime.Unix())
- }
- // Extract legal hold status
- if legalHold := r.Header.Get(s3_constants.AmzObjectLockLegalHold); legalHold != "" {
- // Store S3 standard "ON"/"OFF" values directly
- if legalHold == s3_constants.LegalHoldOn || legalHold == s3_constants.LegalHoldOff {
- entry.Extended[s3_constants.ExtLegalHoldKey] = []byte(legalHold)
- glog.V(2).Infof("extractObjectLockMetadataFromRequest: storing legal hold: %s", legalHold)
- } else {
- glog.Errorf("extractObjectLockMetadataFromRequest: unexpected legal hold value provided, expected 'ON' or 'OFF'")
- return ErrInvalidLegalHoldStatus
- }
- }
- // Apply bucket default retention if no explicit retention was provided
- // This implements AWS S3 behavior where bucket default retention automatically applies to new objects
- if explicitMode == "" && explicitRetainUntilDate == "" {
- bucket, _ := s3_constants.GetBucketAndObject(r)
- if err := s3a.applyBucketDefaultRetention(bucket, entry); err != nil {
- glog.V(2).Infof("extractObjectLockMetadataFromRequest: skipping bucket default retention for %s: %v", bucket, err)
- // Don't fail the upload if default retention can't be applied - this matches AWS behavior
- }
- }
- return nil
- }
- // applyBucketDefaultEncryption applies bucket default encryption settings to a new object
- // This implements AWS S3 behavior where bucket default encryption automatically applies to new objects
- // when no explicit encryption headers are provided in the upload request.
- // Returns the modified dataReader and encryption keys instead of using pointer parameters for better code clarity.
- func (s3a *S3ApiServer) applyBucketDefaultEncryption(bucket string, r *http.Request, dataReader io.Reader) (*BucketDefaultEncryptionResult, error) {
- // Check if bucket has default encryption configured
- encryptionConfig, err := s3a.GetBucketEncryptionConfig(bucket)
- if err != nil || encryptionConfig == nil {
- // No default encryption configured, return original reader
- return &BucketDefaultEncryptionResult{DataReader: dataReader}, nil
- }
- if encryptionConfig.SseAlgorithm == "" {
- // No encryption algorithm specified
- return &BucketDefaultEncryptionResult{DataReader: dataReader}, nil
- }
- glog.V(3).Infof("applyBucketDefaultEncryption: applying default encryption %s for bucket %s", encryptionConfig.SseAlgorithm, bucket)
- switch encryptionConfig.SseAlgorithm {
- case EncryptionTypeAES256:
- // Apply SSE-S3 (AES256) encryption
- return s3a.applySSES3DefaultEncryption(dataReader)
- case EncryptionTypeKMS:
- // Apply SSE-KMS encryption
- return s3a.applySSEKMSDefaultEncryption(bucket, r, dataReader, encryptionConfig)
- default:
- return nil, fmt.Errorf("unsupported default encryption algorithm: %s", encryptionConfig.SseAlgorithm)
- }
- }
- // applySSES3DefaultEncryption applies SSE-S3 encryption as bucket default
- func (s3a *S3ApiServer) applySSES3DefaultEncryption(dataReader io.Reader) (*BucketDefaultEncryptionResult, error) {
- // Generate SSE-S3 key
- keyManager := GetSSES3KeyManager()
- key, err := keyManager.GetOrCreateKey("")
- if err != nil {
- return nil, fmt.Errorf("failed to generate SSE-S3 key for default encryption: %v", err)
- }
- // Create encrypted reader
- encryptedReader, iv, encErr := CreateSSES3EncryptedReader(dataReader, key)
- if encErr != nil {
- return nil, fmt.Errorf("failed to create SSE-S3 encrypted reader for default encryption: %v", encErr)
- }
- // Store IV on the key object for later decryption
- key.IV = iv
- // Store key in manager for later retrieval
- keyManager.StoreKey(key)
- glog.V(3).Infof("applySSES3DefaultEncryption: applied SSE-S3 default encryption with key ID: %s", key.KeyID)
- return &BucketDefaultEncryptionResult{
- DataReader: encryptedReader,
- SSES3Key: key,
- }, nil
- }
- // applySSEKMSDefaultEncryption applies SSE-KMS encryption as bucket default
- func (s3a *S3ApiServer) applySSEKMSDefaultEncryption(bucket string, r *http.Request, dataReader io.Reader, encryptionConfig *s3_pb.EncryptionConfiguration) (*BucketDefaultEncryptionResult, error) {
- // Use the KMS key ID from bucket configuration, or default if not specified
- keyID := encryptionConfig.KmsKeyId
- if keyID == "" {
- keyID = "alias/aws/s3" // AWS default KMS key for S3
- }
- // Check if bucket key is enabled in configuration
- bucketKeyEnabled := encryptionConfig.BucketKeyEnabled
- // Build encryption context for KMS
- bucket, object := s3_constants.GetBucketAndObject(r)
- encryptionContext := BuildEncryptionContext(bucket, object, bucketKeyEnabled)
- // Create SSE-KMS encrypted reader
- encryptedReader, sseKey, encErr := CreateSSEKMSEncryptedReaderWithBucketKey(dataReader, keyID, encryptionContext, bucketKeyEnabled)
- if encErr != nil {
- return nil, fmt.Errorf("failed to create SSE-KMS encrypted reader for default encryption: %v", encErr)
- }
- glog.V(3).Infof("applySSEKMSDefaultEncryption: applied SSE-KMS default encryption with key ID: %s", keyID)
- return &BucketDefaultEncryptionResult{
- DataReader: encryptedReader,
- SSEKMSKey: sseKey,
- }, nil
- }
- // applyBucketDefaultRetention applies bucket default retention settings to a new object
- // This implements AWS S3 behavior where bucket default retention automatically applies to new objects
- // when no explicit retention headers are provided in the upload request
- func (s3a *S3ApiServer) applyBucketDefaultRetention(bucket string, entry *filer_pb.Entry) error {
- // Safety check - if bucket config cache is not available, skip default retention
- if s3a.bucketConfigCache == nil {
- return nil
- }
- // Get bucket configuration (getBucketConfig handles caching internally)
- bucketConfig, errCode := s3a.getBucketConfig(bucket)
- if errCode != s3err.ErrNone {
- return fmt.Errorf("failed to get bucket config: %v", errCode)
- }
- // Check if bucket has cached Object Lock configuration
- if bucketConfig.ObjectLockConfig == nil {
- return nil // No Object Lock configuration
- }
- objectLockConfig := bucketConfig.ObjectLockConfig
- // Check if there's a default retention rule
- if objectLockConfig.Rule == nil || objectLockConfig.Rule.DefaultRetention == nil {
- return nil // No default retention configured
- }
- defaultRetention := objectLockConfig.Rule.DefaultRetention
- // Validate default retention has required fields
- if defaultRetention.Mode == "" {
- return fmt.Errorf("default retention missing mode")
- }
- if !defaultRetention.DaysSet && !defaultRetention.YearsSet {
- return fmt.Errorf("default retention missing period")
- }
- // Calculate retention until date based on default retention period
- var retainUntilDate time.Time
- now := time.Now()
- if defaultRetention.DaysSet && defaultRetention.Days > 0 {
- retainUntilDate = now.AddDate(0, 0, defaultRetention.Days)
- } else if defaultRetention.YearsSet && defaultRetention.Years > 0 {
- retainUntilDate = now.AddDate(defaultRetention.Years, 0, 0)
- }
- // Apply default retention to the object
- if entry.Extended == nil {
- entry.Extended = make(map[string][]byte)
- }
- entry.Extended[s3_constants.ExtObjectLockModeKey] = []byte(defaultRetention.Mode)
- entry.Extended[s3_constants.ExtRetentionUntilDateKey] = []byte(strconv.FormatInt(retainUntilDate.Unix(), 10))
- glog.V(2).Infof("applyBucketDefaultRetention: applied default retention %s until %s for bucket %s",
- defaultRetention.Mode, retainUntilDate.Format(time.RFC3339), bucket)
- return nil
- }
- // validateObjectLockHeaders validates object lock headers in PUT requests
- func (s3a *S3ApiServer) validateObjectLockHeaders(r *http.Request, versioningEnabled bool) error {
- // Extract object lock headers from request
- mode := r.Header.Get(s3_constants.AmzObjectLockMode)
- retainUntilDateStr := r.Header.Get(s3_constants.AmzObjectLockRetainUntilDate)
- legalHold := r.Header.Get(s3_constants.AmzObjectLockLegalHold)
- // Check if any object lock headers are present
- hasObjectLockHeaders := mode != "" || retainUntilDateStr != "" || legalHold != ""
- // Object lock headers can only be used on versioned buckets
- if hasObjectLockHeaders && !versioningEnabled {
- return ErrObjectLockVersioningRequired
- }
- // Validate object lock mode if present
- if mode != "" {
- if mode != s3_constants.RetentionModeGovernance && mode != s3_constants.RetentionModeCompliance {
- return ErrInvalidObjectLockMode
- }
- }
- // Validate retention date if present
- if retainUntilDateStr != "" {
- retainUntilDate, err := time.Parse(time.RFC3339, retainUntilDateStr)
- if err != nil {
- return ErrInvalidRetentionDateFormat
- }
- // Retention date must be in the future
- if retainUntilDate.Before(time.Now()) {
- return ErrRetentionDateMustBeFuture
- }
- }
- // If mode is specified, retention date must also be specified
- if mode != "" && retainUntilDateStr == "" {
- return ErrObjectLockModeRequiresDate
- }
- // If retention date is specified, mode must also be specified
- if retainUntilDateStr != "" && mode == "" {
- return ErrRetentionDateRequiresMode
- }
- // Validate legal hold if present
- if legalHold != "" {
- if legalHold != s3_constants.LegalHoldOn && legalHold != s3_constants.LegalHoldOff {
- return ErrInvalidLegalHoldStatus
- }
- }
- // Check for governance bypass header - only valid for versioned buckets
- bypassGovernance := r.Header.Get("x-amz-bypass-governance-retention") == "true"
- // Governance bypass headers are only valid for versioned buckets (like object lock headers)
- if bypassGovernance && !versioningEnabled {
- return ErrGovernanceBypassVersioningRequired
- }
- return nil
- }
- // mapValidationErrorToS3Error maps object lock validation errors to appropriate S3 error codes
- func mapValidationErrorToS3Error(err error) s3err.ErrorCode {
- // Check for sentinel errors first
- switch {
- case errors.Is(err, ErrObjectLockVersioningRequired):
- // For object lock operations on non-versioned buckets, return InvalidRequest
- // This matches the test expectations
- return s3err.ErrInvalidRequest
- case errors.Is(err, ErrInvalidObjectLockMode):
- // For invalid object lock mode, return InvalidRequest
- // This matches the test expectations
- return s3err.ErrInvalidRequest
- case errors.Is(err, ErrInvalidLegalHoldStatus):
- // For invalid legal hold status in XML body, return MalformedXML
- // AWS S3 treats invalid status values in XML as malformed content
- return s3err.ErrMalformedXML
- case errors.Is(err, ErrInvalidRetentionDateFormat):
- // For malformed retention date format, return MalformedDate
- // This matches the test expectations
- return s3err.ErrMalformedDate
- case errors.Is(err, ErrRetentionDateMustBeFuture):
- // For retention dates in the past, return InvalidRequest
- // This matches the test expectations
- return s3err.ErrInvalidRequest
- case errors.Is(err, ErrObjectLockModeRequiresDate):
- // For mode without retention date, return InvalidRequest
- // This matches the test expectations
- return s3err.ErrInvalidRequest
- case errors.Is(err, ErrRetentionDateRequiresMode):
- // For retention date without mode, return InvalidRequest
- // This matches the test expectations
- return s3err.ErrInvalidRequest
- case errors.Is(err, ErrGovernanceBypassVersioningRequired):
- // For governance bypass on non-versioned bucket, return InvalidRequest
- // This matches the test expectations
- return s3err.ErrInvalidRequest
- case errors.Is(err, ErrMalformedXML):
- // For malformed XML in request body, return MalformedXML
- // This matches the test expectations for invalid retention mode and legal hold status
- return s3err.ErrMalformedXML
- case errors.Is(err, ErrInvalidRetentionPeriod):
- // For invalid retention period (e.g., Days <= 0), return InvalidRetentionPeriod
- // This matches the test expectations
- return s3err.ErrInvalidRetentionPeriod
- case errors.Is(err, ErrComplianceModeActive):
- // For compliance mode retention violations, return AccessDenied
- // This matches the test expectations
- return s3err.ErrAccessDenied
- case errors.Is(err, ErrGovernanceModeActive):
- // For governance mode retention violations, return AccessDenied
- // This matches the test expectations
- return s3err.ErrAccessDenied
- case errors.Is(err, ErrObjectUnderLegalHold):
- // For legal hold violations, return AccessDenied
- // This matches the test expectations
- return s3err.ErrAccessDenied
- case errors.Is(err, ErrGovernanceBypassNotPermitted):
- // For governance bypass permission violations, return AccessDenied
- // This matches the test expectations
- return s3err.ErrAccessDenied
- // Validation error constants
- case errors.Is(err, ErrObjectLockConfigurationMissingEnabled):
- return s3err.ErrMalformedXML
- case errors.Is(err, ErrInvalidObjectLockEnabledValue):
- return s3err.ErrMalformedXML
- case errors.Is(err, ErrRuleMissingDefaultRetention):
- return s3err.ErrMalformedXML
- case errors.Is(err, ErrDefaultRetentionMissingMode):
- return s3err.ErrMalformedXML
- case errors.Is(err, ErrInvalidDefaultRetentionMode):
- return s3err.ErrMalformedXML
- case errors.Is(err, ErrDefaultRetentionMissingPeriod):
- return s3err.ErrMalformedXML
- case errors.Is(err, ErrDefaultRetentionBothDaysAndYears):
- return s3err.ErrMalformedXML
- case errors.Is(err, ErrDefaultRetentionDaysOutOfRange):
- return s3err.ErrInvalidRetentionPeriod
- case errors.Is(err, ErrDefaultRetentionYearsOutOfRange):
- return s3err.ErrInvalidRetentionPeriod
- }
- // Check for error constants from the updated validation functions
- switch {
- case errors.Is(err, ErrRetentionMissingMode):
- return s3err.ErrInvalidRequest
- case errors.Is(err, ErrRetentionMissingRetainUntilDate):
- return s3err.ErrInvalidRequest
- case errors.Is(err, ErrInvalidRetentionModeValue):
- return s3err.ErrMalformedXML
- }
- return s3err.ErrInvalidRequest
- }
- // EntryGetter interface for dependency injection in tests
- // Simplified to only mock the data access dependency
- type EntryGetter interface {
- getEntry(parentDirectoryPath, entryName string) (*filer_pb.Entry, error)
- }
- // conditionalHeaders holds parsed conditional header values
- type conditionalHeaders struct {
- ifMatch string
- ifNoneMatch string
- ifModifiedSince time.Time
- ifUnmodifiedSince time.Time
- isSet bool // true if any conditional headers are present
- }
- // parseConditionalHeaders extracts and validates conditional headers from the request
- func parseConditionalHeaders(r *http.Request) (conditionalHeaders, s3err.ErrorCode) {
- headers := conditionalHeaders{
- ifMatch: r.Header.Get(s3_constants.IfMatch),
- ifNoneMatch: r.Header.Get(s3_constants.IfNoneMatch),
- }
- ifModifiedSinceStr := r.Header.Get(s3_constants.IfModifiedSince)
- ifUnmodifiedSinceStr := r.Header.Get(s3_constants.IfUnmodifiedSince)
- // Check if any conditional headers are present
- headers.isSet = headers.ifMatch != "" || headers.ifNoneMatch != "" ||
- ifModifiedSinceStr != "" || ifUnmodifiedSinceStr != ""
- if !headers.isSet {
- return headers, s3err.ErrNone
- }
- // Parse date headers with validation
- var err error
- if ifModifiedSinceStr != "" {
- headers.ifModifiedSince, err = time.Parse(time.RFC1123, ifModifiedSinceStr)
- if err != nil {
- glog.V(3).Infof("parseConditionalHeaders: Invalid If-Modified-Since format: %v", err)
- return headers, s3err.ErrInvalidRequest
- }
- }
- if ifUnmodifiedSinceStr != "" {
- headers.ifUnmodifiedSince, err = time.Parse(time.RFC1123, ifUnmodifiedSinceStr)
- if err != nil {
- glog.V(3).Infof("parseConditionalHeaders: Invalid If-Unmodified-Since format: %v", err)
- return headers, s3err.ErrInvalidRequest
- }
- }
- return headers, s3err.ErrNone
- }
- // S3ApiServer implements EntryGetter interface
- func (s3a *S3ApiServer) getObjectETag(entry *filer_pb.Entry) string {
- // Try to get ETag from Extended attributes first
- if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag {
- return string(etagBytes)
- }
- // Fallback: calculate ETag from chunks
- return s3a.calculateETagFromChunks(entry.Chunks)
- }
- func (s3a *S3ApiServer) etagMatches(headerValue, objectETag string) bool {
- // Clean the object ETag
- objectETag = strings.Trim(objectETag, `"`)
- // Split header value by commas to handle multiple ETags
- etags := strings.Split(headerValue, ",")
- for _, etag := range etags {
- etag = strings.TrimSpace(etag)
- etag = strings.Trim(etag, `"`)
- if etag == objectETag {
- return true
- }
- }
- return false
- }
- // checkConditionalHeadersWithGetter is a testable method that accepts a simple EntryGetter
- // Uses the production getObjectETag and etagMatches methods to ensure testing of real logic
- func (s3a *S3ApiServer) checkConditionalHeadersWithGetter(getter EntryGetter, r *http.Request, bucket, object string) s3err.ErrorCode {
- headers, errCode := parseConditionalHeaders(r)
- if errCode != s3err.ErrNone {
- glog.V(3).Infof("checkConditionalHeaders: Invalid date format")
- return errCode
- }
- if !headers.isSet {
- return s3err.ErrNone
- }
- // Get object entry for conditional checks.
- bucketDir := "/buckets/" + bucket
- entry, entryErr := getter.getEntry(bucketDir, object)
- objectExists := entryErr == nil
- // For PUT requests, all specified conditions must be met.
- // The evaluation order follows AWS S3 behavior for consistency.
- // 1. Check If-Match
- if headers.ifMatch != "" {
- if !objectExists {
- glog.V(3).Infof("checkConditionalHeaders: If-Match failed - object %s/%s does not exist", bucket, object)
- return s3err.ErrPreconditionFailed
- }
- // If `ifMatch` is "*", the condition is met if the object exists.
- // Otherwise, we need to check the ETag.
- if headers.ifMatch != "*" {
- // Use production getObjectETag method
- objectETag := s3a.getObjectETag(entry)
- // Use production etagMatches method
- if !s3a.etagMatches(headers.ifMatch, objectETag) {
- glog.V(3).Infof("checkConditionalHeaders: If-Match failed for object %s/%s - expected ETag %s, got %s", bucket, object, headers.ifMatch, objectETag)
- return s3err.ErrPreconditionFailed
- }
- }
- glog.V(3).Infof("checkConditionalHeaders: If-Match passed for object %s/%s", bucket, object)
- }
- // 2. Check If-Unmodified-Since
- if !headers.ifUnmodifiedSince.IsZero() {
- if objectExists {
- objectModTime := time.Unix(entry.Attributes.Mtime, 0)
- if objectModTime.After(headers.ifUnmodifiedSince) {
- glog.V(3).Infof("checkConditionalHeaders: If-Unmodified-Since failed - object modified after %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
- return s3err.ErrPreconditionFailed
- }
- glog.V(3).Infof("checkConditionalHeaders: If-Unmodified-Since passed - object not modified since %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
- }
- }
- // 3. Check If-None-Match
- if headers.ifNoneMatch != "" {
- if objectExists {
- if headers.ifNoneMatch == "*" {
- glog.V(3).Infof("checkConditionalHeaders: If-None-Match=* failed - object %s/%s exists", bucket, object)
- return s3err.ErrPreconditionFailed
- }
- // Use production getObjectETag method
- objectETag := s3a.getObjectETag(entry)
- // Use production etagMatches method
- if s3a.etagMatches(headers.ifNoneMatch, objectETag) {
- glog.V(3).Infof("checkConditionalHeaders: If-None-Match failed - ETag matches %s", objectETag)
- return s3err.ErrPreconditionFailed
- }
- glog.V(3).Infof("checkConditionalHeaders: If-None-Match passed - ETag %s doesn't match %s", objectETag, headers.ifNoneMatch)
- } else {
- glog.V(3).Infof("checkConditionalHeaders: If-None-Match passed - object %s/%s does not exist", bucket, object)
- }
- }
- // 4. Check If-Modified-Since
- if !headers.ifModifiedSince.IsZero() {
- if objectExists {
- objectModTime := time.Unix(entry.Attributes.Mtime, 0)
- if !objectModTime.After(headers.ifModifiedSince) {
- glog.V(3).Infof("checkConditionalHeaders: If-Modified-Since failed - object not modified since %s", r.Header.Get(s3_constants.IfModifiedSince))
- return s3err.ErrPreconditionFailed
- }
- glog.V(3).Infof("checkConditionalHeaders: If-Modified-Since passed - object modified after %s", r.Header.Get(s3_constants.IfModifiedSince))
- }
- }
- return s3err.ErrNone
- }
- // checkConditionalHeaders is the production method that uses the S3ApiServer as EntryGetter
- func (s3a *S3ApiServer) checkConditionalHeaders(r *http.Request, bucket, object string) s3err.ErrorCode {
- return s3a.checkConditionalHeadersWithGetter(s3a, r, bucket, object)
- }
- // checkConditionalHeadersForReadsWithGetter is a testable method for read operations
- // Uses the production getObjectETag and etagMatches methods to ensure testing of real logic
- func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGetter, r *http.Request, bucket, object string) ConditionalHeaderResult {
- headers, errCode := parseConditionalHeaders(r)
- if errCode != s3err.ErrNone {
- glog.V(3).Infof("checkConditionalHeadersForReads: Invalid date format")
- return ConditionalHeaderResult{ErrorCode: errCode}
- }
- if !headers.isSet {
- return ConditionalHeaderResult{ErrorCode: s3err.ErrNone}
- }
- // Get object entry for conditional checks.
- bucketDir := "/buckets/" + bucket
- entry, entryErr := getter.getEntry(bucketDir, object)
- objectExists := entryErr == nil
- // If object doesn't exist, fail for If-Match and If-Unmodified-Since
- if !objectExists {
- if headers.ifMatch != "" {
- glog.V(3).Infof("checkConditionalHeadersForReads: If-Match failed - object %s/%s does not exist", bucket, object)
- return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed}
- }
- if !headers.ifUnmodifiedSince.IsZero() {
- glog.V(3).Infof("checkConditionalHeadersForReads: If-Unmodified-Since failed - object %s/%s does not exist", bucket, object)
- return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed}
- }
- // If-None-Match and If-Modified-Since succeed when object doesn't exist
- return ConditionalHeaderResult{ErrorCode: s3err.ErrNone}
- }
- // Object exists - check all conditions
- // The evaluation order follows AWS S3 behavior for consistency.
- // 1. Check If-Match (412 Precondition Failed if fails)
- if headers.ifMatch != "" {
- // If `ifMatch` is "*", the condition is met if the object exists.
- // Otherwise, we need to check the ETag.
- if headers.ifMatch != "*" {
- // Use production getObjectETag method
- objectETag := s3a.getObjectETag(entry)
- // Use production etagMatches method
- if !s3a.etagMatches(headers.ifMatch, objectETag) {
- glog.V(3).Infof("checkConditionalHeadersForReads: If-Match failed for object %s/%s - expected ETag %s, got %s", bucket, object, headers.ifMatch, objectETag)
- return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed}
- }
- }
- glog.V(3).Infof("checkConditionalHeadersForReads: If-Match passed for object %s/%s", bucket, object)
- }
- // 2. Check If-Unmodified-Since (412 Precondition Failed if fails)
- if !headers.ifUnmodifiedSince.IsZero() {
- objectModTime := time.Unix(entry.Attributes.Mtime, 0)
- if objectModTime.After(headers.ifUnmodifiedSince) {
- glog.V(3).Infof("checkConditionalHeadersForReads: If-Unmodified-Since failed - object modified after %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
- return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed}
- }
- glog.V(3).Infof("checkConditionalHeadersForReads: If-Unmodified-Since passed - object not modified since %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
- }
- // 3. Check If-None-Match (304 Not Modified if fails)
- if headers.ifNoneMatch != "" {
- // Use production getObjectETag method
- objectETag := s3a.getObjectETag(entry)
- if headers.ifNoneMatch == "*" {
- glog.V(3).Infof("checkConditionalHeadersForReads: If-None-Match=* failed - object %s/%s exists", bucket, object)
- return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag}
- }
- // Use production etagMatches method
- if s3a.etagMatches(headers.ifNoneMatch, objectETag) {
- glog.V(3).Infof("checkConditionalHeadersForReads: If-None-Match failed - ETag matches %s", objectETag)
- return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag}
- }
- glog.V(3).Infof("checkConditionalHeadersForReads: If-None-Match passed - ETag %s doesn't match %s", objectETag, headers.ifNoneMatch)
- }
- // 4. Check If-Modified-Since (304 Not Modified if fails)
- if !headers.ifModifiedSince.IsZero() {
- objectModTime := time.Unix(entry.Attributes.Mtime, 0)
- if !objectModTime.After(headers.ifModifiedSince) {
- // Use production getObjectETag method
- objectETag := s3a.getObjectETag(entry)
- glog.V(3).Infof("checkConditionalHeadersForReads: If-Modified-Since failed - object not modified since %s", r.Header.Get(s3_constants.IfModifiedSince))
- return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag}
- }
- glog.V(3).Infof("checkConditionalHeadersForReads: If-Modified-Since passed - object modified after %s", r.Header.Get(s3_constants.IfModifiedSince))
- }
- return ConditionalHeaderResult{ErrorCode: s3err.ErrNone}
- }
- // checkConditionalHeadersForReads is the production method that uses the S3ApiServer as EntryGetter
- func (s3a *S3ApiServer) checkConditionalHeadersForReads(r *http.Request, bucket, object string) ConditionalHeaderResult {
- return s3a.checkConditionalHeadersForReadsWithGetter(s3a, r, bucket, object)
- }
|