s3api_object_handlers_put.go 52 KB


  1. package s3api
  2. import (
  3. "crypto/md5"
  4. "encoding/base64"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/pquerna/cachecontrol/cacheobject"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  18. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  19. "github.com/seaweedfs/seaweedfs/weed/security"
  20. weed_server "github.com/seaweedfs/seaweedfs/weed/server"
  21. stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
  22. "github.com/seaweedfs/seaweedfs/weed/util/constants"
  23. )
  24. // Object lock validation errors
  25. var (
  26. ErrObjectLockVersioningRequired = errors.New("object lock headers can only be used on versioned buckets")
  27. ErrInvalidObjectLockMode = errors.New("invalid object lock mode")
  28. ErrInvalidLegalHoldStatus = errors.New("invalid legal hold status")
  29. ErrInvalidRetentionDateFormat = errors.New("invalid retention until date format")
  30. ErrRetentionDateMustBeFuture = errors.New("retain until date must be in the future")
  31. ErrObjectLockModeRequiresDate = errors.New("object lock mode requires retention until date")
  32. ErrRetentionDateRequiresMode = errors.New("retention until date requires object lock mode")
  33. ErrGovernanceBypassVersioningRequired = errors.New("governance bypass header can only be used on versioned buckets")
  34. ErrInvalidObjectLockDuration = errors.New("object lock duration must be greater than 0 days")
  35. ErrObjectLockDurationExceeded = errors.New("object lock duration exceeds maximum allowed days")
  36. ErrObjectLockConfigurationMissingEnabled = errors.New("object lock configuration must specify ObjectLockEnabled")
  37. ErrInvalidObjectLockEnabledValue = errors.New("invalid object lock enabled value")
  38. ErrRuleMissingDefaultRetention = errors.New("rule configuration must specify DefaultRetention")
  39. ErrDefaultRetentionMissingMode = errors.New("default retention must specify Mode")
  40. ErrInvalidDefaultRetentionMode = errors.New("invalid default retention mode")
  41. ErrDefaultRetentionMissingPeriod = errors.New("default retention must specify either Days or Years")
  42. ErrDefaultRetentionBothDaysAndYears = errors.New("default retention cannot specify both Days and Years")
  43. ErrDefaultRetentionDaysOutOfRange = errors.New("default retention days must be between 0 and 36500")
  44. ErrDefaultRetentionYearsOutOfRange = errors.New("default retention years must be between 0 and 100")
  45. )
  46. // hasExplicitEncryption checks if any explicit encryption was provided in the request.
  47. // This helper improves readability and makes the encryption check condition more explicit.
  48. func hasExplicitEncryption(customerKey *SSECustomerKey, sseKMSKey *SSEKMSKey, sseS3Key *SSES3Key) bool {
  49. return customerKey != nil || sseKMSKey != nil || sseS3Key != nil
  50. }
  51. // BucketDefaultEncryptionResult holds the result of bucket default encryption processing
  52. type BucketDefaultEncryptionResult struct {
  53. DataReader io.Reader
  54. SSES3Key *SSES3Key
  55. SSEKMSKey *SSEKMSKey
  56. }
  57. func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
  58. // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
  59. bucket, object := s3_constants.GetBucketAndObject(r)
  60. authHeader := r.Header.Get("Authorization")
  61. authPreview := authHeader
  62. if len(authHeader) > 50 {
  63. authPreview = authHeader[:50] + "..."
  64. }
  65. glog.V(0).Infof("PutObjectHandler: Starting PUT %s/%s (Auth: %s)", bucket, object, authPreview)
  66. glog.V(3).Infof("PutObjectHandler %s %s", bucket, object)
  67. _, err := validateContentMd5(r.Header)
  68. if err != nil {
  69. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest)
  70. return
  71. }
  72. // Check conditional headers
  73. if errCode := s3a.checkConditionalHeaders(r, bucket, object); errCode != s3err.ErrNone {
  74. s3err.WriteErrorResponse(w, r, errCode)
  75. return
  76. }
  77. if r.Header.Get("Cache-Control") != "" {
  78. if _, err = cacheobject.ParseRequestCacheControl(r.Header.Get("Cache-Control")); err != nil {
  79. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest)
  80. return
  81. }
  82. }
  83. if r.Header.Get("Expires") != "" {
  84. if _, err = time.Parse(http.TimeFormat, r.Header.Get("Expires")); err != nil {
  85. s3err.WriteErrorResponse(w, r, s3err.ErrMalformedDate)
  86. return
  87. }
  88. }
  89. dataReader, s3ErrCode := getRequestDataReader(s3a, r)
  90. if s3ErrCode != s3err.ErrNone {
  91. s3err.WriteErrorResponse(w, r, s3ErrCode)
  92. return
  93. }
  94. defer dataReader.Close()
  95. objectContentType := r.Header.Get("Content-Type")
  96. if strings.HasSuffix(object, "/") && r.ContentLength <= 1024 {
  97. if err := s3a.mkdir(
  98. s3a.option.BucketsPath, bucket+strings.TrimSuffix(object, "/"),
  99. func(entry *filer_pb.Entry) {
  100. if objectContentType == "" {
  101. objectContentType = s3_constants.FolderMimeType
  102. }
  103. if r.ContentLength > 0 {
  104. entry.Content, _ = io.ReadAll(r.Body)
  105. }
  106. entry.Attributes.Mime = objectContentType
  107. // Set object owner for directory objects (same as regular objects)
  108. s3a.setObjectOwnerFromRequest(r, entry)
  109. }); err != nil {
  110. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  111. return
  112. }
  113. } else {
  114. // Get detailed versioning state for the bucket
  115. versioningState, err := s3a.getVersioningState(bucket)
  116. if err != nil {
  117. if errors.Is(err, filer_pb.ErrNotFound) {
  118. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
  119. return
  120. }
  121. glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
  122. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  123. return
  124. }
  125. versioningEnabled := (versioningState == s3_constants.VersioningEnabled)
  126. versioningConfigured := (versioningState != "")
  127. glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningState=%s", bucket, object, versioningState)
  128. // Validate object lock headers before processing
  129. if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil {
  130. glog.V(2).Infof("PutObjectHandler: object lock header validation failed for bucket %s, object %s: %v", bucket, object, err)
  131. s3err.WriteErrorResponse(w, r, mapValidationErrorToS3Error(err))
  132. return
  133. }
  134. // For non-versioned buckets, check if existing object has object lock protections
  135. // that would prevent overwrite (PUT operations overwrite existing objects in non-versioned buckets)
  136. if !versioningConfigured {
  137. governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
  138. if err := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); err != nil {
  139. glog.V(2).Infof("PutObjectHandler: object lock permissions check failed for %s/%s: %v", bucket, object, err)
  140. s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
  141. return
  142. }
  143. }
  144. if versioningState == s3_constants.VersioningEnabled {
  145. // Handle enabled versioning - create new versions with real version IDs
  146. glog.V(1).Infof("PutObjectHandler: using versioned PUT for %s/%s", bucket, object)
  147. versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
  148. if errCode != s3err.ErrNone {
  149. s3err.WriteErrorResponse(w, r, errCode)
  150. return
  151. }
  152. // Set version ID in response header
  153. if versionId != "" {
  154. w.Header().Set("x-amz-version-id", versionId)
  155. }
  156. // Set ETag in response
  157. setEtag(w, etag)
  158. } else if versioningState == s3_constants.VersioningSuspended {
  159. // Handle suspended versioning - overwrite with "null" version ID but preserve existing versions
  160. glog.V(1).Infof("PutObjectHandler: using suspended versioning PUT for %s/%s", bucket, object)
  161. etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
  162. if errCode != s3err.ErrNone {
  163. s3err.WriteErrorResponse(w, r, errCode)
  164. return
  165. }
  166. // Note: Suspended versioning should NOT return x-amz-version-id header according to AWS S3 spec
  167. // The object is stored with "null" version internally but no version header is returned
  168. // Set ETag in response
  169. setEtag(w, etag)
  170. } else {
  171. // Handle regular PUT (never configured versioning)
  172. glog.V(1).Infof("PutObjectHandler: using regular PUT for %s/%s", bucket, object)
  173. uploadUrl := s3a.toFilerUrl(bucket, object)
  174. if objectContentType == "" {
  175. dataReader = mimeDetect(r, dataReader)
  176. }
  177. etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, 1)
  178. if errCode != s3err.ErrNone {
  179. s3err.WriteErrorResponse(w, r, errCode)
  180. return
  181. }
  182. // No version ID header for never-configured versioning
  183. setEtag(w, etag)
  184. // Set SSE response headers based on encryption type used
  185. if sseType == s3_constants.SSETypeS3 {
  186. w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
  187. }
  188. }
  189. }
  190. stats_collect.RecordBucketActiveTime(bucket)
  191. stats_collect.S3UploadedObjectsCounter.WithLabelValues(bucket).Inc()
  192. writeSuccessResponseEmpty(w, r)
  193. }
  194. func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseType string) {
  195. // Calculate unique offset for each part to prevent IV reuse in multipart uploads
  196. // This is critical for CTR mode encryption security
  197. partOffset := calculatePartOffset(partNumber)
  198. // Handle all SSE encryption types in a unified manner to eliminate repetitive dataReader assignments
  199. sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset)
  200. if sseErrorCode != s3err.ErrNone {
  201. return "", sseErrorCode, ""
  202. }
  203. // Extract results from unified SSE handling
  204. dataReader = sseResult.DataReader
  205. customerKey := sseResult.CustomerKey
  206. sseIV := sseResult.SSEIV
  207. sseKMSKey := sseResult.SSEKMSKey
  208. sseKMSMetadata := sseResult.SSEKMSMetadata
  209. sseS3Key := sseResult.SSES3Key
  210. sseS3Metadata := sseResult.SSES3Metadata
  211. // Apply bucket default encryption if no explicit encryption was provided
  212. // This implements AWS S3 behavior where bucket default encryption automatically applies
  213. if !hasExplicitEncryption(customerKey, sseKMSKey, sseS3Key) {
  214. glog.V(4).Infof("putToFiler: no explicit encryption detected, checking for bucket default encryption")
  215. // Apply bucket default encryption and get the result
  216. encryptionResult, applyErr := s3a.applyBucketDefaultEncryption(bucket, r, dataReader)
  217. if applyErr != nil {
  218. glog.Errorf("Failed to apply bucket default encryption: %v", applyErr)
  219. return "", s3err.ErrInternalError, ""
  220. }
  221. // Update variables based on the result
  222. dataReader = encryptionResult.DataReader
  223. sseS3Key = encryptionResult.SSES3Key
  224. sseKMSKey = encryptionResult.SSEKMSKey
  225. // If SSE-S3 was applied by bucket default, prepare metadata (if not already done)
  226. if sseS3Key != nil && len(sseS3Metadata) == 0 {
  227. var metaErr error
  228. sseS3Metadata, metaErr = SerializeSSES3Metadata(sseS3Key)
  229. if metaErr != nil {
  230. glog.Errorf("Failed to serialize SSE-S3 metadata for bucket default encryption: %v", metaErr)
  231. return "", s3err.ErrInternalError, ""
  232. }
  233. }
  234. } else {
  235. glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption")
  236. }
  237. hash := md5.New()
  238. var body = io.TeeReader(dataReader, hash)
  239. proxyReq, err := http.NewRequest(http.MethodPut, uploadUrl, body)
  240. if err != nil {
  241. glog.Errorf("NewRequest %s: %v", uploadUrl, err)
  242. return "", s3err.ErrInternalError, ""
  243. }
  244. proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
  245. if destination != "" {
  246. proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination)
  247. }
  248. if s3a.option.FilerGroup != "" {
  249. query := proxyReq.URL.Query()
  250. query.Add("collection", s3a.getCollectionName(bucket))
  251. proxyReq.URL.RawQuery = query.Encode()
  252. }
  253. for header, values := range r.Header {
  254. for _, value := range values {
  255. proxyReq.Header.Add(header, value)
  256. }
  257. }
  258. // Set object owner header for filer to extract
  259. amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
  260. if amzAccountId != "" {
  261. proxyReq.Header.Set(s3_constants.ExtAmzOwnerKey, amzAccountId)
  262. glog.V(2).Infof("putToFiler: setting owner header %s for object %s", amzAccountId, uploadUrl)
  263. }
  264. // Set SSE-C metadata headers for the filer if encryption was applied
  265. if customerKey != nil && len(sseIV) > 0 {
  266. proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, "AES256")
  267. proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, customerKey.KeyMD5)
  268. // Store IV in a custom header that the filer can use to store in entry metadata
  269. proxyReq.Header.Set(s3_constants.SeaweedFSSSEIVHeader, base64.StdEncoding.EncodeToString(sseIV))
  270. }
  271. // Set SSE-KMS metadata headers for the filer if KMS encryption was applied
  272. if sseKMSKey != nil {
  273. // Use already-serialized SSE-KMS metadata from helper function
  274. // Store serialized KMS metadata in a custom header that the filer can use
  275. proxyReq.Header.Set(s3_constants.SeaweedFSSSEKMSKeyHeader, base64.StdEncoding.EncodeToString(sseKMSMetadata))
  276. glog.V(3).Infof("putToFiler: storing SSE-KMS metadata for object %s with keyID %s", uploadUrl, sseKMSKey.KeyID)
  277. } else {
  278. glog.V(4).Infof("putToFiler: no SSE-KMS encryption detected")
  279. }
  280. // Set SSE-S3 metadata headers for the filer if S3 encryption was applied
  281. if sseS3Key != nil && len(sseS3Metadata) > 0 {
  282. // Store serialized S3 metadata in a custom header that the filer can use
  283. proxyReq.Header.Set(s3_constants.SeaweedFSSSES3Key, base64.StdEncoding.EncodeToString(sseS3Metadata))
  284. glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", uploadUrl, sseS3Key.KeyID)
  285. }
  286. // ensure that the Authorization header is overriding any previous
  287. // Authorization header which might be already present in proxyReq
  288. s3a.maybeAddFilerJwtAuthorization(proxyReq, true)
  289. resp, postErr := s3a.client.Do(proxyReq)
  290. if postErr != nil {
  291. glog.Errorf("post to filer: %v", postErr)
  292. if strings.Contains(postErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
  293. return "", s3err.ErrInvalidDigest, ""
  294. }
  295. return "", s3err.ErrInternalError, ""
  296. }
  297. defer resp.Body.Close()
  298. etag = fmt.Sprintf("%x", hash.Sum(nil))
  299. resp_body, ra_err := io.ReadAll(resp.Body)
  300. if ra_err != nil {
  301. glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err)
  302. return etag, s3err.ErrInternalError, ""
  303. }
  304. var ret weed_server.FilerPostResult
  305. unmarshal_err := json.Unmarshal(resp_body, &ret)
  306. if unmarshal_err != nil {
  307. glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
  308. return "", s3err.ErrInternalError, ""
  309. }
  310. if ret.Error != "" {
  311. glog.Errorf("upload to filer error: %v", ret.Error)
  312. return "", filerErrorToS3Error(ret.Error), ""
  313. }
  314. BucketTrafficReceived(ret.Size, r)
  315. // Return the SSE type determined by the unified handler
  316. return etag, s3err.ErrNone, sseResult.SSEType
  317. }
  318. func setEtag(w http.ResponseWriter, etag string) {
  319. if etag != "" {
  320. if strings.HasPrefix(etag, "\"") {
  321. w.Header()["ETag"] = []string{etag}
  322. } else {
  323. w.Header()["ETag"] = []string{"\"" + etag + "\""}
  324. }
  325. }
  326. }
  327. func filerErrorToS3Error(errString string) s3err.ErrorCode {
  328. switch {
  329. case errString == constants.ErrMsgBadDigest:
  330. return s3err.ErrBadDigest
  331. case strings.Contains(errString, "context canceled") || strings.Contains(errString, "code = Canceled"):
  332. // Client canceled the request, return client error not server error
  333. return s3err.ErrInvalidRequest
  334. case strings.HasPrefix(errString, "existing ") && strings.HasSuffix(errString, "is a directory"):
  335. return s3err.ErrExistingObjectIsDirectory
  336. case strings.HasSuffix(errString, "is a file"):
  337. return s3err.ErrExistingObjectIsFile
  338. default:
  339. return s3err.ErrInternalError
  340. }
  341. }
  342. func (s3a *S3ApiServer) maybeAddFilerJwtAuthorization(r *http.Request, isWrite bool) {
  343. encodedJwt := s3a.maybeGetFilerJwtAuthorizationToken(isWrite)
  344. if encodedJwt == "" {
  345. return
  346. }
  347. r.Header.Set("Authorization", "BEARER "+string(encodedJwt))
  348. }
  349. func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string {
  350. var encodedJwt security.EncodedJwt
  351. if isWrite {
  352. encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.SigningKey, s3a.filerGuard.ExpiresAfterSec)
  353. } else {
  354. encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec)
  355. }
  356. return string(encodedJwt)
  357. }
  358. // setObjectOwnerFromRequest sets the object owner metadata based on the authenticated user
  359. func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_pb.Entry) {
  360. amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
  361. if amzAccountId != "" {
  362. if entry.Extended == nil {
  363. entry.Extended = make(map[string][]byte)
  364. }
  365. entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
  366. glog.V(2).Infof("setObjectOwnerFromRequest: set object owner to %s", amzAccountId)
  367. }
  368. }
  369. // putVersionedObject handles PUT operations for versioned buckets using the new layout
  370. // where all versions (including latest) are stored in the .versions directory
  371. func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) {
  372. // For suspended versioning, store as regular object (version ID "null") but preserve existing versions
  373. glog.V(2).Infof("putSuspendedVersioningObject: creating null version for %s/%s", bucket, object)
  374. uploadUrl := s3a.toFilerUrl(bucket, object)
  375. if objectContentType == "" {
  376. dataReader = mimeDetect(r, dataReader)
  377. }
  378. etag, errCode, _ = s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, 1)
  379. if errCode != s3err.ErrNone {
  380. glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
  381. return "", errCode
  382. }
  383. // Get the uploaded entry to add version metadata indicating this is "null" version
  384. bucketDir := s3a.option.BucketsPath + "/" + bucket
  385. entry, err := s3a.getEntry(bucketDir, object)
  386. if err != nil {
  387. glog.Errorf("putSuspendedVersioningObject: failed to get object entry: %v", err)
  388. return "", s3err.ErrInternalError
  389. }
  390. // Add metadata to indicate this is a "null" version for suspended versioning
  391. if entry.Extended == nil {
  392. entry.Extended = make(map[string][]byte)
  393. }
  394. entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
  395. // Set object owner for suspended versioning objects
  396. s3a.setObjectOwnerFromRequest(r, entry)
  397. // Extract and store object lock metadata from request headers (if any)
  398. if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil {
  399. glog.Errorf("putSuspendedVersioningObject: failed to extract object lock metadata: %v", err)
  400. return "", s3err.ErrInvalidRequest
  401. }
  402. // Update the entry with metadata
  403. err = s3a.mkFile(bucketDir, object, entry.Chunks, func(updatedEntry *filer_pb.Entry) {
  404. updatedEntry.Extended = entry.Extended
  405. updatedEntry.Attributes = entry.Attributes
  406. updatedEntry.Chunks = entry.Chunks
  407. })
  408. if err != nil {
  409. glog.Errorf("putSuspendedVersioningObject: failed to update object metadata: %v", err)
  410. return "", s3err.ErrInternalError
  411. }
  412. // Update all existing versions/delete markers to set IsLatest=false since "null" is now latest
  413. err = s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, object)
  414. if err != nil {
  415. glog.Warningf("putSuspendedVersioningObject: failed to update IsLatest flags: %v", err)
  416. // Don't fail the request, but log the warning
  417. }
  418. glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object)
  419. return etag, s3err.ErrNone
  420. }
  421. // updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers
  422. // when a new "null" version becomes the latest during suspended versioning
  423. func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object string) error {
  424. bucketDir := s3a.option.BucketsPath + "/" + bucket
  425. versionsObjectPath := object + ".versions"
  426. versionsDir := bucketDir + "/" + versionsObjectPath
  427. glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: updating flags for %s%s", bucket, object)
  428. // Check if .versions directory exists
  429. _, err := s3a.getEntry(bucketDir, versionsObjectPath)
  430. if err != nil {
  431. // No .versions directory exists, nothing to update
  432. glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: no .versions directory for %s%s", bucket, object)
  433. return nil
  434. }
  435. // List all entries in .versions directory
  436. entries, _, err := s3a.list(versionsDir, "", "", false, 1000)
  437. if err != nil {
  438. return fmt.Errorf("failed to list versions directory: %v", err)
  439. }
  440. glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: found %d entries to update", len(entries))
  441. // Update each version/delete marker to set IsLatest=false
  442. for _, entry := range entries {
  443. if entry.Extended == nil {
  444. continue
  445. }
  446. // Check if this entry has a version ID (it should be a version or delete marker)
  447. versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
  448. if !hasVersionId {
  449. continue
  450. }
  451. versionId := string(versionIdBytes)
  452. glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: setting IsLatest=false for version %s", versionId)
  453. // Update the entry to set IsLatest=false (we don't explicitly store this flag,
  454. // it's determined by comparison with latest version metadata)
  455. // We need to clear the latest version metadata from the .versions directory
  456. // so that our getObjectVersionList function will correctly show IsLatest=false
  457. }
  458. // Clear the latest version metadata from .versions directory since "null" is now latest
  459. versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
  460. if err == nil && versionsEntry.Extended != nil {
  461. // Remove latest version metadata so all versions show IsLatest=false
  462. delete(versionsEntry.Extended, s3_constants.ExtLatestVersionIdKey)
  463. delete(versionsEntry.Extended, s3_constants.ExtLatestVersionFileNameKey)
  464. // Update the .versions directory entry
  465. err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
  466. updatedEntry.Extended = versionsEntry.Extended
  467. updatedEntry.Attributes = versionsEntry.Attributes
  468. updatedEntry.Chunks = versionsEntry.Chunks
  469. })
  470. if err != nil {
  471. return fmt.Errorf("failed to update .versions directory metadata: %v", err)
  472. }
  473. glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: cleared latest version metadata for %s%s", bucket, object)
  474. }
  475. return nil
  476. }
  477. func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) {
  478. // Generate version ID
  479. versionId = generateVersionId()
  480. glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s", versionId, bucket, object)
  481. // Create the version file name
  482. versionFileName := s3a.getVersionFileName(versionId)
  483. // Upload directly to the versions directory
  484. // We need to construct the object path relative to the bucket
  485. versionObjectPath := object + ".versions/" + versionFileName
  486. versionUploadUrl := s3a.toFilerUrl(bucket, versionObjectPath)
  487. hash := md5.New()
  488. var body = io.TeeReader(dataReader, hash)
  489. if objectContentType == "" {
  490. body = mimeDetect(r, body)
  491. }
  492. glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl)
  493. etag, errCode, _ = s3a.putToFiler(r, versionUploadUrl, body, "", bucket, 1)
  494. if errCode != s3err.ErrNone {
  495. glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
  496. return "", "", errCode
  497. }
  498. // Get the uploaded entry to add versioning metadata
  499. bucketDir := s3a.option.BucketsPath + "/" + bucket
  500. versionEntry, err := s3a.getEntry(bucketDir, versionObjectPath)
  501. if err != nil {
  502. glog.Errorf("putVersionedObject: failed to get version entry: %v", err)
  503. return "", "", s3err.ErrInternalError
  504. }
  505. // Add versioning metadata to this version
  506. if versionEntry.Extended == nil {
  507. versionEntry.Extended = make(map[string][]byte)
  508. }
  509. versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
  510. // Store ETag with quotes for S3 compatibility
  511. if !strings.HasPrefix(etag, "\"") {
  512. etag = "\"" + etag + "\""
  513. }
  514. versionEntry.Extended[s3_constants.ExtETagKey] = []byte(etag)
  515. // Set object owner for versioned objects
  516. s3a.setObjectOwnerFromRequest(r, versionEntry)
  517. // Extract and store object lock metadata from request headers
  518. if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil {
  519. glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err)
  520. return "", "", s3err.ErrInvalidRequest
  521. }
  522. // Update the version entry with metadata
  523. err = s3a.mkFile(bucketDir, versionObjectPath, versionEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
  524. updatedEntry.Extended = versionEntry.Extended
  525. updatedEntry.Attributes = versionEntry.Attributes
  526. updatedEntry.Chunks = versionEntry.Chunks
  527. })
  528. if err != nil {
  529. glog.Errorf("putVersionedObject: failed to update version metadata: %v", err)
  530. return "", "", s3err.ErrInternalError
  531. }
  532. // Update the .versions directory metadata to indicate this is the latest version
  533. err = s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName)
  534. if err != nil {
  535. glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
  536. return "", "", s3err.ErrInternalError
  537. }
  538. glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s", versionId, bucket, object)
  539. return versionId, etag, s3err.ErrNone
  540. }
  541. // updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version
  542. func (s3a *S3ApiServer) updateLatestVersionInDirectory(bucket, object, versionId, versionFileName string) error {
  543. bucketDir := s3a.option.BucketsPath + "/" + bucket
  544. versionsObjectPath := object + ".versions"
  545. // Get the current .versions directory entry
  546. versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
  547. if err != nil {
  548. glog.Errorf("updateLatestVersionInDirectory: failed to get .versions entry: %v", err)
  549. return fmt.Errorf("failed to get .versions entry: %w", err)
  550. }
  551. // Add or update the latest version metadata
  552. if versionsEntry.Extended == nil {
  553. versionsEntry.Extended = make(map[string][]byte)
  554. }
  555. versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(versionId)
  556. versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(versionFileName)
  557. // Update the .versions directory entry with metadata
  558. err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
  559. updatedEntry.Extended = versionsEntry.Extended
  560. updatedEntry.Attributes = versionsEntry.Attributes
  561. updatedEntry.Chunks = versionsEntry.Chunks
  562. })
  563. if err != nil {
  564. glog.Errorf("updateLatestVersionInDirectory: failed to update .versions directory metadata: %v", err)
  565. return fmt.Errorf("failed to update .versions directory metadata: %w", err)
  566. }
  567. return nil
  568. }
  569. // extractObjectLockMetadataFromRequest extracts object lock headers from PUT requests
  570. // and applies bucket default retention if no explicit retention is provided
  571. func (s3a *S3ApiServer) extractObjectLockMetadataFromRequest(r *http.Request, entry *filer_pb.Entry) error {
  572. if entry.Extended == nil {
  573. entry.Extended = make(map[string][]byte)
  574. }
  575. // Extract explicit object lock mode (GOVERNANCE or COMPLIANCE)
  576. explicitMode := r.Header.Get(s3_constants.AmzObjectLockMode)
  577. if explicitMode != "" {
  578. entry.Extended[s3_constants.ExtObjectLockModeKey] = []byte(explicitMode)
  579. glog.V(2).Infof("extractObjectLockMetadataFromRequest: storing explicit object lock mode: %s", explicitMode)
  580. }
  581. // Extract explicit retention until date
  582. explicitRetainUntilDate := r.Header.Get(s3_constants.AmzObjectLockRetainUntilDate)
  583. if explicitRetainUntilDate != "" {
  584. // Parse the ISO8601 date and convert to Unix timestamp for storage
  585. parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate)
  586. if err != nil {
  587. glog.Errorf("extractObjectLockMetadataFromRequest: failed to parse retention until date, expected format: %s, error: %v", time.RFC3339, err)
  588. return ErrInvalidRetentionDateFormat
  589. }
  590. entry.Extended[s3_constants.ExtRetentionUntilDateKey] = []byte(strconv.FormatInt(parsedTime.Unix(), 10))
  591. glog.V(2).Infof("extractObjectLockMetadataFromRequest: storing explicit retention until date (timestamp: %d)", parsedTime.Unix())
  592. }
  593. // Extract legal hold status
  594. if legalHold := r.Header.Get(s3_constants.AmzObjectLockLegalHold); legalHold != "" {
  595. // Store S3 standard "ON"/"OFF" values directly
  596. if legalHold == s3_constants.LegalHoldOn || legalHold == s3_constants.LegalHoldOff {
  597. entry.Extended[s3_constants.ExtLegalHoldKey] = []byte(legalHold)
  598. glog.V(2).Infof("extractObjectLockMetadataFromRequest: storing legal hold: %s", legalHold)
  599. } else {
  600. glog.Errorf("extractObjectLockMetadataFromRequest: unexpected legal hold value provided, expected 'ON' or 'OFF'")
  601. return ErrInvalidLegalHoldStatus
  602. }
  603. }
  604. // Apply bucket default retention if no explicit retention was provided
  605. // This implements AWS S3 behavior where bucket default retention automatically applies to new objects
  606. if explicitMode == "" && explicitRetainUntilDate == "" {
  607. bucket, _ := s3_constants.GetBucketAndObject(r)
  608. if err := s3a.applyBucketDefaultRetention(bucket, entry); err != nil {
  609. glog.V(2).Infof("extractObjectLockMetadataFromRequest: skipping bucket default retention for %s: %v", bucket, err)
  610. // Don't fail the upload if default retention can't be applied - this matches AWS behavior
  611. }
  612. }
  613. return nil
  614. }
  615. // applyBucketDefaultEncryption applies bucket default encryption settings to a new object
  616. // This implements AWS S3 behavior where bucket default encryption automatically applies to new objects
  617. // when no explicit encryption headers are provided in the upload request.
  618. // Returns the modified dataReader and encryption keys instead of using pointer parameters for better code clarity.
  619. func (s3a *S3ApiServer) applyBucketDefaultEncryption(bucket string, r *http.Request, dataReader io.Reader) (*BucketDefaultEncryptionResult, error) {
  620. // Check if bucket has default encryption configured
  621. encryptionConfig, err := s3a.GetBucketEncryptionConfig(bucket)
  622. if err != nil || encryptionConfig == nil {
  623. // No default encryption configured, return original reader
  624. return &BucketDefaultEncryptionResult{DataReader: dataReader}, nil
  625. }
  626. if encryptionConfig.SseAlgorithm == "" {
  627. // No encryption algorithm specified
  628. return &BucketDefaultEncryptionResult{DataReader: dataReader}, nil
  629. }
  630. glog.V(3).Infof("applyBucketDefaultEncryption: applying default encryption %s for bucket %s", encryptionConfig.SseAlgorithm, bucket)
  631. switch encryptionConfig.SseAlgorithm {
  632. case EncryptionTypeAES256:
  633. // Apply SSE-S3 (AES256) encryption
  634. return s3a.applySSES3DefaultEncryption(dataReader)
  635. case EncryptionTypeKMS:
  636. // Apply SSE-KMS encryption
  637. return s3a.applySSEKMSDefaultEncryption(bucket, r, dataReader, encryptionConfig)
  638. default:
  639. return nil, fmt.Errorf("unsupported default encryption algorithm: %s", encryptionConfig.SseAlgorithm)
  640. }
  641. }
  642. // applySSES3DefaultEncryption applies SSE-S3 encryption as bucket default
  643. func (s3a *S3ApiServer) applySSES3DefaultEncryption(dataReader io.Reader) (*BucketDefaultEncryptionResult, error) {
  644. // Generate SSE-S3 key
  645. keyManager := GetSSES3KeyManager()
  646. key, err := keyManager.GetOrCreateKey("")
  647. if err != nil {
  648. return nil, fmt.Errorf("failed to generate SSE-S3 key for default encryption: %v", err)
  649. }
  650. // Create encrypted reader
  651. encryptedReader, iv, encErr := CreateSSES3EncryptedReader(dataReader, key)
  652. if encErr != nil {
  653. return nil, fmt.Errorf("failed to create SSE-S3 encrypted reader for default encryption: %v", encErr)
  654. }
  655. // Store IV on the key object for later decryption
  656. key.IV = iv
  657. // Store key in manager for later retrieval
  658. keyManager.StoreKey(key)
  659. glog.V(3).Infof("applySSES3DefaultEncryption: applied SSE-S3 default encryption with key ID: %s", key.KeyID)
  660. return &BucketDefaultEncryptionResult{
  661. DataReader: encryptedReader,
  662. SSES3Key: key,
  663. }, nil
  664. }
  665. // applySSEKMSDefaultEncryption applies SSE-KMS encryption as bucket default
  666. func (s3a *S3ApiServer) applySSEKMSDefaultEncryption(bucket string, r *http.Request, dataReader io.Reader, encryptionConfig *s3_pb.EncryptionConfiguration) (*BucketDefaultEncryptionResult, error) {
  667. // Use the KMS key ID from bucket configuration, or default if not specified
  668. keyID := encryptionConfig.KmsKeyId
  669. if keyID == "" {
  670. keyID = "alias/aws/s3" // AWS default KMS key for S3
  671. }
  672. // Check if bucket key is enabled in configuration
  673. bucketKeyEnabled := encryptionConfig.BucketKeyEnabled
  674. // Build encryption context for KMS
  675. bucket, object := s3_constants.GetBucketAndObject(r)
  676. encryptionContext := BuildEncryptionContext(bucket, object, bucketKeyEnabled)
  677. // Create SSE-KMS encrypted reader
  678. encryptedReader, sseKey, encErr := CreateSSEKMSEncryptedReaderWithBucketKey(dataReader, keyID, encryptionContext, bucketKeyEnabled)
  679. if encErr != nil {
  680. return nil, fmt.Errorf("failed to create SSE-KMS encrypted reader for default encryption: %v", encErr)
  681. }
  682. glog.V(3).Infof("applySSEKMSDefaultEncryption: applied SSE-KMS default encryption with key ID: %s", keyID)
  683. return &BucketDefaultEncryptionResult{
  684. DataReader: encryptedReader,
  685. SSEKMSKey: sseKey,
  686. }, nil
  687. }
  688. // applyBucketDefaultRetention applies bucket default retention settings to a new object
  689. // This implements AWS S3 behavior where bucket default retention automatically applies to new objects
  690. // when no explicit retention headers are provided in the upload request
  691. func (s3a *S3ApiServer) applyBucketDefaultRetention(bucket string, entry *filer_pb.Entry) error {
  692. // Safety check - if bucket config cache is not available, skip default retention
  693. if s3a.bucketConfigCache == nil {
  694. return nil
  695. }
  696. // Get bucket configuration (getBucketConfig handles caching internally)
  697. bucketConfig, errCode := s3a.getBucketConfig(bucket)
  698. if errCode != s3err.ErrNone {
  699. return fmt.Errorf("failed to get bucket config: %v", errCode)
  700. }
  701. // Check if bucket has cached Object Lock configuration
  702. if bucketConfig.ObjectLockConfig == nil {
  703. return nil // No Object Lock configuration
  704. }
  705. objectLockConfig := bucketConfig.ObjectLockConfig
  706. // Check if there's a default retention rule
  707. if objectLockConfig.Rule == nil || objectLockConfig.Rule.DefaultRetention == nil {
  708. return nil // No default retention configured
  709. }
  710. defaultRetention := objectLockConfig.Rule.DefaultRetention
  711. // Validate default retention has required fields
  712. if defaultRetention.Mode == "" {
  713. return fmt.Errorf("default retention missing mode")
  714. }
  715. if !defaultRetention.DaysSet && !defaultRetention.YearsSet {
  716. return fmt.Errorf("default retention missing period")
  717. }
  718. // Calculate retention until date based on default retention period
  719. var retainUntilDate time.Time
  720. now := time.Now()
  721. if defaultRetention.DaysSet && defaultRetention.Days > 0 {
  722. retainUntilDate = now.AddDate(0, 0, defaultRetention.Days)
  723. } else if defaultRetention.YearsSet && defaultRetention.Years > 0 {
  724. retainUntilDate = now.AddDate(defaultRetention.Years, 0, 0)
  725. }
  726. // Apply default retention to the object
  727. if entry.Extended == nil {
  728. entry.Extended = make(map[string][]byte)
  729. }
  730. entry.Extended[s3_constants.ExtObjectLockModeKey] = []byte(defaultRetention.Mode)
  731. entry.Extended[s3_constants.ExtRetentionUntilDateKey] = []byte(strconv.FormatInt(retainUntilDate.Unix(), 10))
  732. glog.V(2).Infof("applyBucketDefaultRetention: applied default retention %s until %s for bucket %s",
  733. defaultRetention.Mode, retainUntilDate.Format(time.RFC3339), bucket)
  734. return nil
  735. }
  736. // validateObjectLockHeaders validates object lock headers in PUT requests
  737. func (s3a *S3ApiServer) validateObjectLockHeaders(r *http.Request, versioningEnabled bool) error {
  738. // Extract object lock headers from request
  739. mode := r.Header.Get(s3_constants.AmzObjectLockMode)
  740. retainUntilDateStr := r.Header.Get(s3_constants.AmzObjectLockRetainUntilDate)
  741. legalHold := r.Header.Get(s3_constants.AmzObjectLockLegalHold)
  742. // Check if any object lock headers are present
  743. hasObjectLockHeaders := mode != "" || retainUntilDateStr != "" || legalHold != ""
  744. // Object lock headers can only be used on versioned buckets
  745. if hasObjectLockHeaders && !versioningEnabled {
  746. return ErrObjectLockVersioningRequired
  747. }
  748. // Validate object lock mode if present
  749. if mode != "" {
  750. if mode != s3_constants.RetentionModeGovernance && mode != s3_constants.RetentionModeCompliance {
  751. return ErrInvalidObjectLockMode
  752. }
  753. }
  754. // Validate retention date if present
  755. if retainUntilDateStr != "" {
  756. retainUntilDate, err := time.Parse(time.RFC3339, retainUntilDateStr)
  757. if err != nil {
  758. return ErrInvalidRetentionDateFormat
  759. }
  760. // Retention date must be in the future
  761. if retainUntilDate.Before(time.Now()) {
  762. return ErrRetentionDateMustBeFuture
  763. }
  764. }
  765. // If mode is specified, retention date must also be specified
  766. if mode != "" && retainUntilDateStr == "" {
  767. return ErrObjectLockModeRequiresDate
  768. }
  769. // If retention date is specified, mode must also be specified
  770. if retainUntilDateStr != "" && mode == "" {
  771. return ErrRetentionDateRequiresMode
  772. }
  773. // Validate legal hold if present
  774. if legalHold != "" {
  775. if legalHold != s3_constants.LegalHoldOn && legalHold != s3_constants.LegalHoldOff {
  776. return ErrInvalidLegalHoldStatus
  777. }
  778. }
  779. // Check for governance bypass header - only valid for versioned buckets
  780. bypassGovernance := r.Header.Get("x-amz-bypass-governance-retention") == "true"
  781. // Governance bypass headers are only valid for versioned buckets (like object lock headers)
  782. if bypassGovernance && !versioningEnabled {
  783. return ErrGovernanceBypassVersioningRequired
  784. }
  785. return nil
  786. }
  787. // mapValidationErrorToS3Error maps object lock validation errors to appropriate S3 error codes
  788. func mapValidationErrorToS3Error(err error) s3err.ErrorCode {
  789. // Check for sentinel errors first
  790. switch {
  791. case errors.Is(err, ErrObjectLockVersioningRequired):
  792. // For object lock operations on non-versioned buckets, return InvalidRequest
  793. // This matches the test expectations
  794. return s3err.ErrInvalidRequest
  795. case errors.Is(err, ErrInvalidObjectLockMode):
  796. // For invalid object lock mode, return InvalidRequest
  797. // This matches the test expectations
  798. return s3err.ErrInvalidRequest
  799. case errors.Is(err, ErrInvalidLegalHoldStatus):
  800. // For invalid legal hold status in XML body, return MalformedXML
  801. // AWS S3 treats invalid status values in XML as malformed content
  802. return s3err.ErrMalformedXML
  803. case errors.Is(err, ErrInvalidRetentionDateFormat):
  804. // For malformed retention date format, return MalformedDate
  805. // This matches the test expectations
  806. return s3err.ErrMalformedDate
  807. case errors.Is(err, ErrRetentionDateMustBeFuture):
  808. // For retention dates in the past, return InvalidRequest
  809. // This matches the test expectations
  810. return s3err.ErrInvalidRequest
  811. case errors.Is(err, ErrObjectLockModeRequiresDate):
  812. // For mode without retention date, return InvalidRequest
  813. // This matches the test expectations
  814. return s3err.ErrInvalidRequest
  815. case errors.Is(err, ErrRetentionDateRequiresMode):
  816. // For retention date without mode, return InvalidRequest
  817. // This matches the test expectations
  818. return s3err.ErrInvalidRequest
  819. case errors.Is(err, ErrGovernanceBypassVersioningRequired):
  820. // For governance bypass on non-versioned bucket, return InvalidRequest
  821. // This matches the test expectations
  822. return s3err.ErrInvalidRequest
  823. case errors.Is(err, ErrMalformedXML):
  824. // For malformed XML in request body, return MalformedXML
  825. // This matches the test expectations for invalid retention mode and legal hold status
  826. return s3err.ErrMalformedXML
  827. case errors.Is(err, ErrInvalidRetentionPeriod):
  828. // For invalid retention period (e.g., Days <= 0), return InvalidRetentionPeriod
  829. // This matches the test expectations
  830. return s3err.ErrInvalidRetentionPeriod
  831. case errors.Is(err, ErrComplianceModeActive):
  832. // For compliance mode retention violations, return AccessDenied
  833. // This matches the test expectations
  834. return s3err.ErrAccessDenied
  835. case errors.Is(err, ErrGovernanceModeActive):
  836. // For governance mode retention violations, return AccessDenied
  837. // This matches the test expectations
  838. return s3err.ErrAccessDenied
  839. case errors.Is(err, ErrObjectUnderLegalHold):
  840. // For legal hold violations, return AccessDenied
  841. // This matches the test expectations
  842. return s3err.ErrAccessDenied
  843. case errors.Is(err, ErrGovernanceBypassNotPermitted):
  844. // For governance bypass permission violations, return AccessDenied
  845. // This matches the test expectations
  846. return s3err.ErrAccessDenied
  847. // Validation error constants
  848. case errors.Is(err, ErrObjectLockConfigurationMissingEnabled):
  849. return s3err.ErrMalformedXML
  850. case errors.Is(err, ErrInvalidObjectLockEnabledValue):
  851. return s3err.ErrMalformedXML
  852. case errors.Is(err, ErrRuleMissingDefaultRetention):
  853. return s3err.ErrMalformedXML
  854. case errors.Is(err, ErrDefaultRetentionMissingMode):
  855. return s3err.ErrMalformedXML
  856. case errors.Is(err, ErrInvalidDefaultRetentionMode):
  857. return s3err.ErrMalformedXML
  858. case errors.Is(err, ErrDefaultRetentionMissingPeriod):
  859. return s3err.ErrMalformedXML
  860. case errors.Is(err, ErrDefaultRetentionBothDaysAndYears):
  861. return s3err.ErrMalformedXML
  862. case errors.Is(err, ErrDefaultRetentionDaysOutOfRange):
  863. return s3err.ErrInvalidRetentionPeriod
  864. case errors.Is(err, ErrDefaultRetentionYearsOutOfRange):
  865. return s3err.ErrInvalidRetentionPeriod
  866. }
  867. // Check for error constants from the updated validation functions
  868. switch {
  869. case errors.Is(err, ErrRetentionMissingMode):
  870. return s3err.ErrInvalidRequest
  871. case errors.Is(err, ErrRetentionMissingRetainUntilDate):
  872. return s3err.ErrInvalidRequest
  873. case errors.Is(err, ErrInvalidRetentionModeValue):
  874. return s3err.ErrMalformedXML
  875. }
  876. return s3err.ErrInvalidRequest
  877. }
  878. // EntryGetter interface for dependency injection in tests
  879. // Simplified to only mock the data access dependency
  880. type EntryGetter interface {
  881. getEntry(parentDirectoryPath, entryName string) (*filer_pb.Entry, error)
  882. }
  883. // conditionalHeaders holds parsed conditional header values
  884. type conditionalHeaders struct {
  885. ifMatch string
  886. ifNoneMatch string
  887. ifModifiedSince time.Time
  888. ifUnmodifiedSince time.Time
  889. isSet bool // true if any conditional headers are present
  890. }
  891. // parseConditionalHeaders extracts and validates conditional headers from the request
  892. func parseConditionalHeaders(r *http.Request) (conditionalHeaders, s3err.ErrorCode) {
  893. headers := conditionalHeaders{
  894. ifMatch: r.Header.Get(s3_constants.IfMatch),
  895. ifNoneMatch: r.Header.Get(s3_constants.IfNoneMatch),
  896. }
  897. ifModifiedSinceStr := r.Header.Get(s3_constants.IfModifiedSince)
  898. ifUnmodifiedSinceStr := r.Header.Get(s3_constants.IfUnmodifiedSince)
  899. // Check if any conditional headers are present
  900. headers.isSet = headers.ifMatch != "" || headers.ifNoneMatch != "" ||
  901. ifModifiedSinceStr != "" || ifUnmodifiedSinceStr != ""
  902. if !headers.isSet {
  903. return headers, s3err.ErrNone
  904. }
  905. // Parse date headers with validation
  906. var err error
  907. if ifModifiedSinceStr != "" {
  908. headers.ifModifiedSince, err = time.Parse(time.RFC1123, ifModifiedSinceStr)
  909. if err != nil {
  910. glog.V(3).Infof("parseConditionalHeaders: Invalid If-Modified-Since format: %v", err)
  911. return headers, s3err.ErrInvalidRequest
  912. }
  913. }
  914. if ifUnmodifiedSinceStr != "" {
  915. headers.ifUnmodifiedSince, err = time.Parse(time.RFC1123, ifUnmodifiedSinceStr)
  916. if err != nil {
  917. glog.V(3).Infof("parseConditionalHeaders: Invalid If-Unmodified-Since format: %v", err)
  918. return headers, s3err.ErrInvalidRequest
  919. }
  920. }
  921. return headers, s3err.ErrNone
  922. }
  923. // S3ApiServer implements EntryGetter interface
  924. func (s3a *S3ApiServer) getObjectETag(entry *filer_pb.Entry) string {
  925. // Try to get ETag from Extended attributes first
  926. if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag {
  927. return string(etagBytes)
  928. }
  929. // Fallback: calculate ETag from chunks
  930. return s3a.calculateETagFromChunks(entry.Chunks)
  931. }
  932. func (s3a *S3ApiServer) etagMatches(headerValue, objectETag string) bool {
  933. // Clean the object ETag
  934. objectETag = strings.Trim(objectETag, `"`)
  935. // Split header value by commas to handle multiple ETags
  936. etags := strings.Split(headerValue, ",")
  937. for _, etag := range etags {
  938. etag = strings.TrimSpace(etag)
  939. etag = strings.Trim(etag, `"`)
  940. if etag == objectETag {
  941. return true
  942. }
  943. }
  944. return false
  945. }
  946. // checkConditionalHeadersWithGetter is a testable method that accepts a simple EntryGetter
  947. // Uses the production getObjectETag and etagMatches methods to ensure testing of real logic
  948. func (s3a *S3ApiServer) checkConditionalHeadersWithGetter(getter EntryGetter, r *http.Request, bucket, object string) s3err.ErrorCode {
  949. headers, errCode := parseConditionalHeaders(r)
  950. if errCode != s3err.ErrNone {
  951. glog.V(3).Infof("checkConditionalHeaders: Invalid date format")
  952. return errCode
  953. }
  954. if !headers.isSet {
  955. return s3err.ErrNone
  956. }
  957. // Get object entry for conditional checks.
  958. bucketDir := "/buckets/" + bucket
  959. entry, entryErr := getter.getEntry(bucketDir, object)
  960. objectExists := entryErr == nil
  961. // For PUT requests, all specified conditions must be met.
  962. // The evaluation order follows AWS S3 behavior for consistency.
  963. // 1. Check If-Match
  964. if headers.ifMatch != "" {
  965. if !objectExists {
  966. glog.V(3).Infof("checkConditionalHeaders: If-Match failed - object %s/%s does not exist", bucket, object)
  967. return s3err.ErrPreconditionFailed
  968. }
  969. // If `ifMatch` is "*", the condition is met if the object exists.
  970. // Otherwise, we need to check the ETag.
  971. if headers.ifMatch != "*" {
  972. // Use production getObjectETag method
  973. objectETag := s3a.getObjectETag(entry)
  974. // Use production etagMatches method
  975. if !s3a.etagMatches(headers.ifMatch, objectETag) {
  976. glog.V(3).Infof("checkConditionalHeaders: If-Match failed for object %s/%s - expected ETag %s, got %s", bucket, object, headers.ifMatch, objectETag)
  977. return s3err.ErrPreconditionFailed
  978. }
  979. }
  980. glog.V(3).Infof("checkConditionalHeaders: If-Match passed for object %s/%s", bucket, object)
  981. }
  982. // 2. Check If-Unmodified-Since
  983. if !headers.ifUnmodifiedSince.IsZero() {
  984. if objectExists {
  985. objectModTime := time.Unix(entry.Attributes.Mtime, 0)
  986. if objectModTime.After(headers.ifUnmodifiedSince) {
  987. glog.V(3).Infof("checkConditionalHeaders: If-Unmodified-Since failed - object modified after %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
  988. return s3err.ErrPreconditionFailed
  989. }
  990. glog.V(3).Infof("checkConditionalHeaders: If-Unmodified-Since passed - object not modified since %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
  991. }
  992. }
  993. // 3. Check If-None-Match
  994. if headers.ifNoneMatch != "" {
  995. if objectExists {
  996. if headers.ifNoneMatch == "*" {
  997. glog.V(3).Infof("checkConditionalHeaders: If-None-Match=* failed - object %s/%s exists", bucket, object)
  998. return s3err.ErrPreconditionFailed
  999. }
  1000. // Use production getObjectETag method
  1001. objectETag := s3a.getObjectETag(entry)
  1002. // Use production etagMatches method
  1003. if s3a.etagMatches(headers.ifNoneMatch, objectETag) {
  1004. glog.V(3).Infof("checkConditionalHeaders: If-None-Match failed - ETag matches %s", objectETag)
  1005. return s3err.ErrPreconditionFailed
  1006. }
  1007. glog.V(3).Infof("checkConditionalHeaders: If-None-Match passed - ETag %s doesn't match %s", objectETag, headers.ifNoneMatch)
  1008. } else {
  1009. glog.V(3).Infof("checkConditionalHeaders: If-None-Match passed - object %s/%s does not exist", bucket, object)
  1010. }
  1011. }
  1012. // 4. Check If-Modified-Since
  1013. if !headers.ifModifiedSince.IsZero() {
  1014. if objectExists {
  1015. objectModTime := time.Unix(entry.Attributes.Mtime, 0)
  1016. if !objectModTime.After(headers.ifModifiedSince) {
  1017. glog.V(3).Infof("checkConditionalHeaders: If-Modified-Since failed - object not modified since %s", r.Header.Get(s3_constants.IfModifiedSince))
  1018. return s3err.ErrPreconditionFailed
  1019. }
  1020. glog.V(3).Infof("checkConditionalHeaders: If-Modified-Since passed - object modified after %s", r.Header.Get(s3_constants.IfModifiedSince))
  1021. }
  1022. }
  1023. return s3err.ErrNone
  1024. }
  1025. // checkConditionalHeaders is the production method that uses the S3ApiServer as EntryGetter
  1026. func (s3a *S3ApiServer) checkConditionalHeaders(r *http.Request, bucket, object string) s3err.ErrorCode {
  1027. return s3a.checkConditionalHeadersWithGetter(s3a, r, bucket, object)
  1028. }
  1029. // checkConditionalHeadersForReadsWithGetter is a testable method for read operations
  1030. // Uses the production getObjectETag and etagMatches methods to ensure testing of real logic
  1031. func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGetter, r *http.Request, bucket, object string) ConditionalHeaderResult {
  1032. headers, errCode := parseConditionalHeaders(r)
  1033. if errCode != s3err.ErrNone {
  1034. glog.V(3).Infof("checkConditionalHeadersForReads: Invalid date format")
  1035. return ConditionalHeaderResult{ErrorCode: errCode}
  1036. }
  1037. if !headers.isSet {
  1038. return ConditionalHeaderResult{ErrorCode: s3err.ErrNone}
  1039. }
  1040. // Get object entry for conditional checks.
  1041. bucketDir := "/buckets/" + bucket
  1042. entry, entryErr := getter.getEntry(bucketDir, object)
  1043. objectExists := entryErr == nil
  1044. // If object doesn't exist, fail for If-Match and If-Unmodified-Since
  1045. if !objectExists {
  1046. if headers.ifMatch != "" {
  1047. glog.V(3).Infof("checkConditionalHeadersForReads: If-Match failed - object %s/%s does not exist", bucket, object)
  1048. return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed}
  1049. }
  1050. if !headers.ifUnmodifiedSince.IsZero() {
  1051. glog.V(3).Infof("checkConditionalHeadersForReads: If-Unmodified-Since failed - object %s/%s does not exist", bucket, object)
  1052. return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed}
  1053. }
  1054. // If-None-Match and If-Modified-Since succeed when object doesn't exist
  1055. return ConditionalHeaderResult{ErrorCode: s3err.ErrNone}
  1056. }
  1057. // Object exists - check all conditions
  1058. // The evaluation order follows AWS S3 behavior for consistency.
  1059. // 1. Check If-Match (412 Precondition Failed if fails)
  1060. if headers.ifMatch != "" {
  1061. // If `ifMatch` is "*", the condition is met if the object exists.
  1062. // Otherwise, we need to check the ETag.
  1063. if headers.ifMatch != "*" {
  1064. // Use production getObjectETag method
  1065. objectETag := s3a.getObjectETag(entry)
  1066. // Use production etagMatches method
  1067. if !s3a.etagMatches(headers.ifMatch, objectETag) {
  1068. glog.V(3).Infof("checkConditionalHeadersForReads: If-Match failed for object %s/%s - expected ETag %s, got %s", bucket, object, headers.ifMatch, objectETag)
  1069. return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed}
  1070. }
  1071. }
  1072. glog.V(3).Infof("checkConditionalHeadersForReads: If-Match passed for object %s/%s", bucket, object)
  1073. }
  1074. // 2. Check If-Unmodified-Since (412 Precondition Failed if fails)
  1075. if !headers.ifUnmodifiedSince.IsZero() {
  1076. objectModTime := time.Unix(entry.Attributes.Mtime, 0)
  1077. if objectModTime.After(headers.ifUnmodifiedSince) {
  1078. glog.V(3).Infof("checkConditionalHeadersForReads: If-Unmodified-Since failed - object modified after %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
  1079. return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed}
  1080. }
  1081. glog.V(3).Infof("checkConditionalHeadersForReads: If-Unmodified-Since passed - object not modified since %s", r.Header.Get(s3_constants.IfUnmodifiedSince))
  1082. }
  1083. // 3. Check If-None-Match (304 Not Modified if fails)
  1084. if headers.ifNoneMatch != "" {
  1085. // Use production getObjectETag method
  1086. objectETag := s3a.getObjectETag(entry)
  1087. if headers.ifNoneMatch == "*" {
  1088. glog.V(3).Infof("checkConditionalHeadersForReads: If-None-Match=* failed - object %s/%s exists", bucket, object)
  1089. return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag}
  1090. }
  1091. // Use production etagMatches method
  1092. if s3a.etagMatches(headers.ifNoneMatch, objectETag) {
  1093. glog.V(3).Infof("checkConditionalHeadersForReads: If-None-Match failed - ETag matches %s", objectETag)
  1094. return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag}
  1095. }
  1096. glog.V(3).Infof("checkConditionalHeadersForReads: If-None-Match passed - ETag %s doesn't match %s", objectETag, headers.ifNoneMatch)
  1097. }
  1098. // 4. Check If-Modified-Since (304 Not Modified if fails)
  1099. if !headers.ifModifiedSince.IsZero() {
  1100. objectModTime := time.Unix(entry.Attributes.Mtime, 0)
  1101. if !objectModTime.After(headers.ifModifiedSince) {
  1102. // Use production getObjectETag method
  1103. objectETag := s3a.getObjectETag(entry)
  1104. glog.V(3).Infof("checkConditionalHeadersForReads: If-Modified-Since failed - object not modified since %s", r.Header.Get(s3_constants.IfModifiedSince))
  1105. return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag}
  1106. }
  1107. glog.V(3).Infof("checkConditionalHeadersForReads: If-Modified-Since passed - object modified after %s", r.Header.Get(s3_constants.IfModifiedSince))
  1108. }
  1109. return ConditionalHeaderResult{ErrorCode: s3err.ErrNone}
  1110. }
  1111. // checkConditionalHeadersForReads is the production method that uses the S3ApiServer as EntryGetter
  1112. func (s3a *S3ApiServer) checkConditionalHeadersForReads(r *http.Request, bucket, object string) ConditionalHeaderResult {
  1113. return s3a.checkConditionalHeadersForReadsWithGetter(s3a, r, bucket, object)
  1114. }