s3api_object_versioning.go 34 KB


  1. package s3api
  2. import (
  3. "crypto/rand"
  4. "encoding/hex"
  5. "encoding/xml"
  6. "fmt"
  7. "net/http"
  8. "path"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/seaweedfs/seaweedfs/weed/filer"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. s3_constants "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  17. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  18. )
  19. // S3ListObjectVersionsResult - Custom struct for S3 list-object-versions response
  20. // This avoids conflicts with the XSD generated ListVersionsResult struct
  21. // and ensures proper separation of versions and delete markers into arrays
  22. type S3ListObjectVersionsResult struct {
  23. XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListVersionsResult"`
  24. Name string `xml:"Name"`
  25. Prefix string `xml:"Prefix,omitempty"`
  26. KeyMarker string `xml:"KeyMarker,omitempty"`
  27. VersionIdMarker string `xml:"VersionIdMarker,omitempty"`
  28. NextKeyMarker string `xml:"NextKeyMarker,omitempty"`
  29. NextVersionIdMarker string `xml:"NextVersionIdMarker,omitempty"`
  30. MaxKeys int `xml:"MaxKeys"`
  31. Delimiter string `xml:"Delimiter,omitempty"`
  32. IsTruncated bool `xml:"IsTruncated"`
  33. // These are the critical fields - arrays instead of single elements
  34. Versions []VersionEntry `xml:"Version,omitempty"` // Array for versions
  35. DeleteMarkers []DeleteMarkerEntry `xml:"DeleteMarker,omitempty"` // Array for delete markers
  36. CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
  37. EncodingType string `xml:"EncodingType,omitempty"`
  38. }
  39. // Original struct - keeping for compatibility but will use S3ListObjectVersionsResult for XML response
  40. type ListObjectVersionsResult struct {
  41. XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListVersionsResult"`
  42. Name string `xml:"Name"`
  43. Prefix string `xml:"Prefix"`
  44. KeyMarker string `xml:"KeyMarker,omitempty"`
  45. VersionIdMarker string `xml:"VersionIdMarker,omitempty"`
  46. NextKeyMarker string `xml:"NextKeyMarker,omitempty"`
  47. NextVersionIdMarker string `xml:"NextVersionIdMarker,omitempty"`
  48. MaxKeys int `xml:"MaxKeys"`
  49. Delimiter string `xml:"Delimiter,omitempty"`
  50. IsTruncated bool `xml:"IsTruncated"`
  51. Versions []VersionEntry `xml:"Version,omitempty"`
  52. DeleteMarkers []DeleteMarkerEntry `xml:"DeleteMarker,omitempty"`
  53. CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
  54. }
  55. // ObjectVersion represents a version of an S3 object
  56. type ObjectVersion struct {
  57. VersionId string
  58. IsLatest bool
  59. IsDeleteMarker bool
  60. LastModified time.Time
  61. ETag string
  62. Size int64
  63. Entry *filer_pb.Entry
  64. }
  65. // generateVersionId creates a unique version ID that preserves chronological order
  66. func generateVersionId() string {
  67. // Use nanosecond timestamp to ensure chronological ordering
  68. // Format as 16-digit hex (first 16 chars of version ID)
  69. now := time.Now().UnixNano()
  70. timestampHex := fmt.Sprintf("%016x", now)
  71. // Generate random 8 bytes for uniqueness (last 16 chars of version ID)
  72. randBytes := make([]byte, 8)
  73. if _, err := rand.Read(randBytes); err != nil {
  74. glog.Errorf("Failed to generate random bytes for version ID: %v", err)
  75. // Fallback to timestamp-only if random generation fails
  76. return timestampHex + "0000000000000000"
  77. }
  78. // Combine timestamp (16 chars) + random (16 chars) = 32 chars total
  79. randomHex := hex.EncodeToString(randBytes)
  80. versionId := timestampHex + randomHex
  81. return versionId
  82. }
  83. // getVersionedObjectDir returns the directory path for storing object versions
  84. func (s3a *S3ApiServer) getVersionedObjectDir(bucket, object string) string {
  85. return path.Join(s3a.option.BucketsPath, bucket, object+".versions")
  86. }
  87. // getVersionFileName returns the filename for a specific version
  88. func (s3a *S3ApiServer) getVersionFileName(versionId string) string {
  89. return fmt.Sprintf("v_%s", versionId)
  90. }
  91. // createDeleteMarker creates a delete marker for versioned delete operations
  92. func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error) {
  93. versionId := generateVersionId()
  94. glog.V(2).Infof("createDeleteMarker: creating delete marker %s for %s/%s", versionId, bucket, object)
  95. // Create the version file name for the delete marker
  96. versionFileName := s3a.getVersionFileName(versionId)
  97. // Store delete marker in the .versions directory
  98. // Make sure to clean up the object path to remove leading slashes
  99. cleanObject := strings.TrimPrefix(object, "/")
  100. bucketDir := s3a.option.BucketsPath + "/" + bucket
  101. versionsDir := bucketDir + "/" + cleanObject + ".versions"
  102. // Create the delete marker entry in the .versions directory
  103. err := s3a.mkFile(versionsDir, versionFileName, nil, func(entry *filer_pb.Entry) {
  104. entry.Name = versionFileName
  105. entry.IsDirectory = false
  106. if entry.Attributes == nil {
  107. entry.Attributes = &filer_pb.FuseAttributes{}
  108. }
  109. entry.Attributes.Mtime = time.Now().Unix()
  110. if entry.Extended == nil {
  111. entry.Extended = make(map[string][]byte)
  112. }
  113. entry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
  114. entry.Extended[s3_constants.ExtDeleteMarkerKey] = []byte("true")
  115. })
  116. if err != nil {
  117. return "", fmt.Errorf("failed to create delete marker in .versions directory: %w", err)
  118. }
  119. // Update the .versions directory metadata to indicate this delete marker is the latest version
  120. err = s3a.updateLatestVersionInDirectory(bucket, cleanObject, versionId, versionFileName)
  121. if err != nil {
  122. glog.Errorf("createDeleteMarker: failed to update latest version in directory: %v", err)
  123. return "", fmt.Errorf("failed to update latest version in directory: %w", err)
  124. }
  125. glog.V(2).Infof("createDeleteMarker: successfully created delete marker %s for %s/%s", versionId, bucket, object)
  126. return versionId, nil
  127. }
  128. // listObjectVersions lists all versions of an object
  129. func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*S3ListObjectVersionsResult, error) {
  130. var allVersions []interface{} // Can contain VersionEntry or DeleteMarkerEntry
  131. // Track objects that have been processed to avoid duplicates
  132. processedObjects := make(map[string]bool)
  133. // Track version IDs globally to prevent duplicates throughout the listing
  134. seenVersionIds := make(map[string]bool)
  135. // Recursively find all .versions directories in the bucket
  136. bucketPath := path.Join(s3a.option.BucketsPath, bucket)
  137. err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix)
  138. if err != nil {
  139. return nil, err
  140. }
  141. // Sort by key, then by LastModified (newest first), then by VersionId for deterministic ordering
  142. sort.Slice(allVersions, func(i, j int) bool {
  143. var keyI, keyJ string
  144. var lastModifiedI, lastModifiedJ time.Time
  145. var versionIdI, versionIdJ string
  146. switch v := allVersions[i].(type) {
  147. case *VersionEntry:
  148. keyI = v.Key
  149. lastModifiedI = v.LastModified
  150. versionIdI = v.VersionId
  151. case *DeleteMarkerEntry:
  152. keyI = v.Key
  153. lastModifiedI = v.LastModified
  154. versionIdI = v.VersionId
  155. }
  156. switch v := allVersions[j].(type) {
  157. case *VersionEntry:
  158. keyJ = v.Key
  159. lastModifiedJ = v.LastModified
  160. versionIdJ = v.VersionId
  161. case *DeleteMarkerEntry:
  162. keyJ = v.Key
  163. lastModifiedJ = v.LastModified
  164. versionIdJ = v.VersionId
  165. }
  166. // First sort by object key
  167. if keyI != keyJ {
  168. return keyI < keyJ
  169. }
  170. // Then by modification time (newest first) - but use nanosecond precision for ties
  171. timeDiff := lastModifiedI.Sub(lastModifiedJ)
  172. if timeDiff.Abs() > time.Millisecond {
  173. return lastModifiedI.After(lastModifiedJ)
  174. }
  175. // For very close timestamps (within 1ms), use version ID for deterministic ordering
  176. // Sort version IDs in reverse lexicographic order to maintain newest-first semantics
  177. return versionIdI > versionIdJ
  178. })
  179. // Build result using S3ListObjectVersionsResult to avoid conflicts with XSD structs
  180. result := &S3ListObjectVersionsResult{
  181. Name: bucket,
  182. Prefix: prefix,
  183. KeyMarker: keyMarker,
  184. MaxKeys: maxKeys,
  185. Delimiter: delimiter,
  186. IsTruncated: len(allVersions) > maxKeys,
  187. }
  188. // Limit results
  189. if len(allVersions) > maxKeys {
  190. allVersions = allVersions[:maxKeys]
  191. result.IsTruncated = true
  192. // Set next markers
  193. switch v := allVersions[len(allVersions)-1].(type) {
  194. case *VersionEntry:
  195. result.NextKeyMarker = v.Key
  196. result.NextVersionIdMarker = v.VersionId
  197. case *DeleteMarkerEntry:
  198. result.NextKeyMarker = v.Key
  199. result.NextVersionIdMarker = v.VersionId
  200. }
  201. }
  202. // Always initialize empty slices so boto3 gets the expected fields even when empty
  203. result.Versions = make([]VersionEntry, 0)
  204. result.DeleteMarkers = make([]DeleteMarkerEntry, 0)
  205. // Add versions to result
  206. for _, version := range allVersions {
  207. switch v := version.(type) {
  208. case *VersionEntry:
  209. result.Versions = append(result.Versions, *v)
  210. case *DeleteMarkerEntry:
  211. result.DeleteMarkers = append(result.DeleteMarkers, *v)
  212. }
  213. }
  214. return result, nil
  215. }
  216. // findVersionsRecursively searches for all .versions directories and regular files recursively
  217. func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix string) error {
  218. // List entries in current directory
  219. entries, _, err := s3a.list(currentPath, "", "", false, 1000)
  220. if err != nil {
  221. return err
  222. }
  223. for _, entry := range entries {
  224. entryPath := path.Join(relativePath, entry.Name)
  225. // Skip if this doesn't match the prefix filter
  226. if normalizedPrefix := strings.TrimPrefix(prefix, "/"); normalizedPrefix != "" {
  227. // An entry is a candidate if:
  228. // 1. Its path is a match for the prefix.
  229. // 2. It is a directory that is an ancestor of the prefix path, so we must descend into it.
  230. // Condition 1: The entry's path starts with the prefix.
  231. isMatch := strings.HasPrefix(entryPath, normalizedPrefix)
  232. if !isMatch && entry.IsDirectory {
  233. // Also check if a directory entry matches a directory-style prefix (e.g., prefix "a/", entry "a").
  234. isMatch = strings.HasPrefix(entryPath+"/", normalizedPrefix)
  235. }
  236. // Condition 2: The prefix path starts with the entry's path (and it's a directory).
  237. canDescend := entry.IsDirectory && strings.HasPrefix(normalizedPrefix, entryPath)
  238. if !isMatch && !canDescend {
  239. continue
  240. }
  241. }
  242. if entry.IsDirectory {
  243. // Skip .uploads directory (multipart upload temporary files)
  244. if strings.HasPrefix(entry.Name, ".uploads") {
  245. continue
  246. }
  247. // Check if this is a .versions directory
  248. if strings.HasSuffix(entry.Name, ".versions") {
  249. // Extract object name from .versions directory name
  250. objectKey := strings.TrimSuffix(entryPath, ".versions")
  251. processedObjects[objectKey] = true
  252. glog.V(2).Infof("findVersionsRecursively: found .versions directory for object %s", objectKey)
  253. versions, err := s3a.getObjectVersionList(bucket, objectKey)
  254. if err != nil {
  255. glog.Warningf("Failed to get versions for object %s: %v", objectKey, err)
  256. continue
  257. }
  258. for _, version := range versions {
  259. // Check for duplicate version IDs and skip if already seen
  260. versionKey := objectKey + ":" + version.VersionId
  261. if seenVersionIds[versionKey] {
  262. glog.Warningf("findVersionsRecursively: duplicate version %s for object %s detected, skipping", version.VersionId, objectKey)
  263. continue
  264. }
  265. seenVersionIds[versionKey] = true
  266. if version.IsDeleteMarker {
  267. deleteMarker := &DeleteMarkerEntry{
  268. Key: objectKey,
  269. VersionId: version.VersionId,
  270. IsLatest: version.IsLatest,
  271. LastModified: version.LastModified,
  272. Owner: s3a.getObjectOwnerFromVersion(version, bucket, objectKey),
  273. }
  274. *allVersions = append(*allVersions, deleteMarker)
  275. } else {
  276. versionEntry := &VersionEntry{
  277. Key: objectKey,
  278. VersionId: version.VersionId,
  279. IsLatest: version.IsLatest,
  280. LastModified: version.LastModified,
  281. ETag: version.ETag,
  282. Size: version.Size,
  283. Owner: s3a.getObjectOwnerFromVersion(version, bucket, objectKey),
  284. StorageClass: "STANDARD",
  285. }
  286. *allVersions = append(*allVersions, versionEntry)
  287. }
  288. }
  289. } else {
  290. // This is a regular directory - check if it's an explicit S3 directory object
  291. // Only include directories that were explicitly created via S3 API (have FolderMimeType)
  292. // This excludes implicit directories created when uploading files like "test1/a"
  293. if entry.Attributes.Mime == s3_constants.FolderMimeType {
  294. directoryKey := entryPath
  295. if !strings.HasSuffix(directoryKey, "/") {
  296. directoryKey += "/"
  297. }
  298. // Add directory as a version entry with VersionId "null" (following S3/Minio behavior)
  299. glog.V(2).Infof("findVersionsRecursively: found explicit S3 directory %s", directoryKey)
  300. // Calculate ETag for empty directory
  301. directoryETag := "\"d41d8cd98f00b204e9800998ecf8427e\""
  302. versionEntry := &VersionEntry{
  303. Key: directoryKey,
  304. VersionId: "null",
  305. IsLatest: true,
  306. LastModified: time.Unix(entry.Attributes.Mtime, 0),
  307. ETag: directoryETag,
  308. Size: 0, // Directories have size 0
  309. Owner: s3a.getObjectOwnerFromEntry(entry),
  310. StorageClass: "STANDARD",
  311. }
  312. *allVersions = append(*allVersions, versionEntry)
  313. }
  314. // Recursively search subdirectories (regardless of whether they're explicit or implicit)
  315. fullPath := path.Join(currentPath, entry.Name)
  316. err := s3a.findVersionsRecursively(fullPath, entryPath, allVersions, processedObjects, seenVersionIds, bucket, prefix)
  317. if err != nil {
  318. glog.Warningf("Error searching subdirectory %s: %v", entryPath, err)
  319. continue
  320. }
  321. }
  322. } else {
  323. // This is a regular file - check if it's a pre-versioning object
  324. objectKey := entryPath
  325. // Skip if this object already has a .versions directory (already processed)
  326. if processedObjects[objectKey] {
  327. continue
  328. }
  329. // This is a pre-versioning object - treat it as a version with VersionId="null"
  330. glog.V(2).Infof("findVersionsRecursively: found pre-versioning object %s", objectKey)
  331. // Check if this null version should be marked as latest
  332. // It's only latest if there's no .versions directory OR no latest version metadata
  333. isLatest := true
  334. versionsObjectPath := objectKey + ".versions"
  335. if versionsEntry, err := s3a.getEntry(currentPath, versionsObjectPath); err == nil {
  336. // .versions directory exists, check if there's latest version metadata
  337. if versionsEntry.Extended != nil {
  338. if _, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest {
  339. // There is a latest version in the .versions directory, so null is not latest
  340. isLatest = false
  341. glog.V(2).Infof("findVersionsRecursively: null version for %s is not latest due to versioned objects", objectKey)
  342. }
  343. }
  344. }
  345. etag := s3a.calculateETagFromChunks(entry.Chunks)
  346. versionEntry := &VersionEntry{
  347. Key: objectKey,
  348. VersionId: "null",
  349. IsLatest: isLatest,
  350. LastModified: time.Unix(entry.Attributes.Mtime, 0),
  351. ETag: etag,
  352. Size: int64(entry.Attributes.FileSize),
  353. Owner: s3a.getObjectOwnerFromEntry(entry),
  354. StorageClass: "STANDARD",
  355. }
  356. *allVersions = append(*allVersions, versionEntry)
  357. }
  358. }
  359. return nil
  360. }
  361. // getObjectVersionList returns all versions of a specific object
  362. func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVersion, error) {
  363. var versions []*ObjectVersion
  364. glog.V(2).Infof("getObjectVersionList: looking for versions of %s/%s in .versions directory", bucket, object)
  365. // All versions are now stored in the .versions directory only
  366. bucketDir := s3a.option.BucketsPath + "/" + bucket
  367. versionsObjectPath := object + ".versions"
  368. glog.V(2).Infof("getObjectVersionList: checking versions directory %s", versionsObjectPath)
  369. // Get the .versions directory entry to read latest version metadata
  370. versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
  371. if err != nil {
  372. // No versions directory exists, return empty list
  373. glog.V(2).Infof("getObjectVersionList: no versions directory found: %v", err)
  374. return versions, nil
  375. }
  376. // Get the latest version info from directory metadata
  377. var latestVersionId string
  378. if versionsEntry.Extended != nil {
  379. if latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatestVersionId {
  380. latestVersionId = string(latestVersionIdBytes)
  381. glog.V(2).Infof("getObjectVersionList: latest version ID from directory metadata: %s", latestVersionId)
  382. }
  383. }
  384. // List all version files in the .versions directory
  385. entries, _, err := s3a.list(bucketDir+"/"+versionsObjectPath, "", "", false, 1000)
  386. if err != nil {
  387. glog.V(2).Infof("getObjectVersionList: failed to list version files: %v", err)
  388. return versions, nil
  389. }
  390. glog.V(2).Infof("getObjectVersionList: found %d entries in versions directory", len(entries))
  391. // Use a map to detect and prevent duplicate version IDs
  392. seenVersionIds := make(map[string]bool)
  393. for i, entry := range entries {
  394. if entry.Extended == nil {
  395. glog.V(2).Infof("getObjectVersionList: entry %d has no Extended metadata, skipping", i)
  396. continue
  397. }
  398. versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
  399. if !hasVersionId {
  400. glog.V(2).Infof("getObjectVersionList: entry %d has no version ID, skipping", i)
  401. continue
  402. }
  403. versionId := string(versionIdBytes)
  404. // Check for duplicate version IDs and skip if already seen
  405. if seenVersionIds[versionId] {
  406. glog.Warningf("getObjectVersionList: duplicate version ID %s detected for object %s/%s, skipping", versionId, bucket, object)
  407. continue
  408. }
  409. seenVersionIds[versionId] = true
  410. // Check if this version is the latest by comparing with directory metadata
  411. isLatest := (versionId == latestVersionId)
  412. isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey]
  413. isDeleteMarker := string(isDeleteMarkerBytes) == "true"
  414. glog.V(2).Infof("getObjectVersionList: found version %s, isLatest=%v, isDeleteMarker=%v", versionId, isLatest, isDeleteMarker)
  415. version := &ObjectVersion{
  416. VersionId: versionId,
  417. IsLatest: isLatest,
  418. IsDeleteMarker: isDeleteMarker,
  419. LastModified: time.Unix(entry.Attributes.Mtime, 0),
  420. Entry: entry,
  421. }
  422. if !isDeleteMarker {
  423. // Try to get ETag from Extended attributes first
  424. if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag {
  425. version.ETag = string(etagBytes)
  426. } else {
  427. // Fallback: calculate ETag from chunks
  428. version.ETag = s3a.calculateETagFromChunks(entry.Chunks)
  429. }
  430. version.Size = int64(entry.Attributes.FileSize)
  431. }
  432. versions = append(versions, version)
  433. }
  434. // Don't sort here - let the main listObjectVersions function handle sorting consistently
  435. glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s (after deduplication from %d entries)", len(versions), bucket, object, len(entries))
  436. for i, version := range versions {
  437. glog.V(2).Infof("getObjectVersionList: version %d: %s (isLatest=%v, isDeleteMarker=%v)", i, version.VersionId, version.IsLatest, version.IsDeleteMarker)
  438. }
  439. return versions, nil
  440. }
  441. // calculateETagFromChunks calculates ETag from file chunks following S3 multipart rules
  442. // This is a wrapper around filer.ETagChunks that adds quotes for S3 compatibility
  443. func (s3a *S3ApiServer) calculateETagFromChunks(chunks []*filer_pb.FileChunk) string {
  444. if len(chunks) == 0 {
  445. return "\"\""
  446. }
  447. // Use the existing filer ETag calculation and add quotes for S3 compatibility
  448. etag := filer.ETagChunks(chunks)
  449. if etag == "" {
  450. return "\"\""
  451. }
  452. return fmt.Sprintf("\"%s\"", etag)
  453. }
  454. // getSpecificObjectVersion retrieves a specific version of an object
  455. func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId string) (*filer_pb.Entry, error) {
  456. if versionId == "" {
  457. // Get current version
  458. return s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), strings.TrimPrefix(object, "/"))
  459. }
  460. if versionId == "null" {
  461. // "null" version ID refers to pre-versioning objects stored as regular files
  462. bucketDir := s3a.option.BucketsPath + "/" + bucket
  463. entry, err := s3a.getEntry(bucketDir, object)
  464. if err != nil {
  465. return nil, fmt.Errorf("null version object %s not found: %v", object, err)
  466. }
  467. return entry, nil
  468. }
  469. // Get specific version from .versions directory
  470. versionsDir := s3a.getVersionedObjectDir(bucket, object)
  471. versionFile := s3a.getVersionFileName(versionId)
  472. entry, err := s3a.getEntry(versionsDir, versionFile)
  473. if err != nil {
  474. return nil, fmt.Errorf("version %s not found: %v", versionId, err)
  475. }
  476. return entry, nil
  477. }
  478. // deleteSpecificObjectVersion deletes a specific version of an object
  479. func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId string) error {
  480. if versionId == "" {
  481. return fmt.Errorf("version ID is required for version-specific deletion")
  482. }
  483. if versionId == "null" {
  484. // Delete "null" version (pre-versioning object stored as regular file)
  485. bucketDir := s3a.option.BucketsPath + "/" + bucket
  486. cleanObject := strings.TrimPrefix(object, "/")
  487. // Check if the object exists
  488. _, err := s3a.getEntry(bucketDir, cleanObject)
  489. if err != nil {
  490. // Object doesn't exist - this is OK for delete operations (idempotent)
  491. glog.V(2).Infof("deleteSpecificObjectVersion: null version object %s already deleted or doesn't exist", cleanObject)
  492. return nil
  493. }
  494. // Delete the regular file
  495. deleteErr := s3a.rm(bucketDir, cleanObject, true, false)
  496. if deleteErr != nil {
  497. // Check if file was already deleted by another process
  498. if _, checkErr := s3a.getEntry(bucketDir, cleanObject); checkErr != nil {
  499. // File doesn't exist anymore, deletion was successful
  500. return nil
  501. }
  502. return fmt.Errorf("failed to delete null version %s: %v", cleanObject, deleteErr)
  503. }
  504. return nil
  505. }
  506. versionsDir := s3a.getVersionedObjectDir(bucket, object)
  507. versionFile := s3a.getVersionFileName(versionId)
  508. // Check if this is the latest version before attempting deletion (for potential metadata update)
  509. versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), object+".versions")
  510. isLatestVersion := false
  511. if dirErr == nil && versionsEntry.Extended != nil {
  512. if latestVersionIdBytes, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest {
  513. isLatestVersion = (string(latestVersionIdBytes) == versionId)
  514. }
  515. }
  516. // Attempt to delete the version file
  517. // Note: We don't check if the file exists first to avoid race conditions
  518. // The deletion operation should be idempotent
  519. deleteErr := s3a.rm(versionsDir, versionFile, true, false)
  520. if deleteErr != nil {
  521. // Check if file was already deleted by another process (race condition handling)
  522. if _, checkErr := s3a.getEntry(versionsDir, versionFile); checkErr != nil {
  523. // File doesn't exist anymore, deletion was successful (another thread deleted it)
  524. glog.V(2).Infof("deleteSpecificObjectVersion: version %s for %s%s already deleted by another process", versionId, bucket, object)
  525. return nil
  526. }
  527. // File still exists but deletion failed for another reason
  528. return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr)
  529. }
  530. // If we deleted the latest version, update the .versions directory metadata to point to the new latest
  531. if isLatestVersion {
  532. err := s3a.updateLatestVersionAfterDeletion(bucket, object)
  533. if err != nil {
  534. glog.Warningf("deleteSpecificObjectVersion: failed to update latest version after deletion: %v", err)
  535. // Don't return error since the deletion was successful
  536. }
  537. }
  538. return nil
  539. }
  540. // updateLatestVersionAfterDeletion finds the new latest version after deleting the current latest
  541. func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string) error {
  542. bucketDir := s3a.option.BucketsPath + "/" + bucket
  543. cleanObject := strings.TrimPrefix(object, "/")
  544. versionsObjectPath := cleanObject + ".versions"
  545. versionsDir := bucketDir + "/" + versionsObjectPath
  546. glog.V(1).Infof("updateLatestVersionAfterDeletion: updating latest version for %s/%s, listing %s", bucket, object, versionsDir)
  547. // List all remaining version files in the .versions directory
  548. entries, _, err := s3a.list(versionsDir, "", "", false, 1000)
  549. if err != nil {
  550. glog.Errorf("updateLatestVersionAfterDeletion: failed to list versions in %s: %v", versionsDir, err)
  551. return fmt.Errorf("failed to list versions: %v", err)
  552. }
  553. glog.V(1).Infof("updateLatestVersionAfterDeletion: found %d entries in %s", len(entries), versionsDir)
  554. // Find the most recent remaining version (latest timestamp in version ID)
  555. var latestVersionId string
  556. var latestVersionFileName string
  557. for _, entry := range entries {
  558. if entry.Extended == nil {
  559. continue
  560. }
  561. versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
  562. if !hasVersionId {
  563. continue
  564. }
  565. versionId := string(versionIdBytes)
  566. // Skip delete markers when finding latest content version
  567. isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey]
  568. if string(isDeleteMarkerBytes) == "true" {
  569. continue
  570. }
  571. // Compare version IDs chronologically (our version IDs start with timestamp)
  572. if latestVersionId == "" || versionId > latestVersionId {
  573. glog.V(1).Infof("updateLatestVersionAfterDeletion: found newer version %s (file: %s)", versionId, entry.Name)
  574. latestVersionId = versionId
  575. latestVersionFileName = entry.Name
  576. } else {
  577. glog.V(1).Infof("updateLatestVersionAfterDeletion: skipping older version %s", versionId)
  578. }
  579. }
  580. // Update the .versions directory metadata
  581. versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
  582. if err != nil {
  583. return fmt.Errorf("failed to get .versions directory: %v", err)
  584. }
  585. if versionsEntry.Extended == nil {
  586. versionsEntry.Extended = make(map[string][]byte)
  587. }
  588. if latestVersionId != "" {
  589. // Update metadata to point to new latest version
  590. versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(latestVersionId)
  591. versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(latestVersionFileName)
  592. glog.V(2).Infof("updateLatestVersionAfterDeletion: new latest version for %s/%s is %s", bucket, object, latestVersionId)
  593. } else {
  594. // No versions left, remove latest version metadata
  595. delete(versionsEntry.Extended, s3_constants.ExtLatestVersionIdKey)
  596. delete(versionsEntry.Extended, s3_constants.ExtLatestVersionFileNameKey)
  597. glog.V(2).Infof("updateLatestVersionAfterDeletion: no versions left for %s/%s", bucket, object)
  598. }
  599. // Update the .versions directory entry
  600. err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
  601. updatedEntry.Extended = versionsEntry.Extended
  602. updatedEntry.Attributes = versionsEntry.Attributes
  603. updatedEntry.Chunks = versionsEntry.Chunks
  604. })
  605. if err != nil {
  606. return fmt.Errorf("failed to update .versions directory metadata: %v", err)
  607. }
  608. return nil
  609. }
  610. // ListObjectVersionsHandler handles the list object versions request
  611. // https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html
  612. func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http.Request) {
  613. bucket, _ := s3_constants.GetBucketAndObject(r)
  614. glog.V(3).Infof("ListObjectVersionsHandler %s", bucket)
  615. if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
  616. s3err.WriteErrorResponse(w, r, err)
  617. return
  618. }
  619. // Parse query parameters
  620. query := r.URL.Query()
  621. originalPrefix := query.Get("prefix") // Keep original prefix for response
  622. prefix := originalPrefix // Use for internal processing
  623. if prefix != "" && !strings.HasPrefix(prefix, "/") {
  624. prefix = "/" + prefix
  625. }
  626. keyMarker := query.Get("key-marker")
  627. versionIdMarker := query.Get("version-id-marker")
  628. delimiter := query.Get("delimiter")
  629. maxKeysStr := query.Get("max-keys")
  630. maxKeys := 1000
  631. if maxKeysStr != "" {
  632. if mk, err := strconv.Atoi(maxKeysStr); err == nil && mk > 0 {
  633. maxKeys = mk
  634. }
  635. }
  636. // List versions
  637. result, err := s3a.listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter, maxKeys)
  638. if err != nil {
  639. glog.Errorf("ListObjectVersionsHandler: %v", err)
  640. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  641. return
  642. }
  643. // Set the original prefix in the response (not the normalized internal prefix)
  644. result.Prefix = originalPrefix
  645. writeSuccessResponseXML(w, r, result)
  646. }
  647. // getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata
  648. func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) {
  649. bucketDir := s3a.option.BucketsPath + "/" + bucket
  650. versionsObjectPath := object + ".versions"
  651. // Get the .versions directory entry to read latest version metadata
  652. versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
  653. if err != nil {
  654. // .versions directory doesn't exist - this can happen for objects that existed
  655. // before versioning was enabled on the bucket. Fall back to checking for a
  656. // regular (non-versioned) object file.
  657. glog.V(2).Infof("getLatestObjectVersion: no .versions directory for %s%s, checking for pre-versioning object", bucket, object)
  658. regularEntry, regularErr := s3a.getEntry(bucketDir, object)
  659. if regularErr != nil {
  660. return nil, fmt.Errorf("failed to get %s%s .versions directory and no regular object found: %w", bucket, object, err)
  661. }
  662. glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s", bucket, object)
  663. return regularEntry, nil
  664. }
  665. // Check if directory has latest version metadata
  666. if versionsEntry.Extended == nil {
  667. // No metadata means all versioned objects have been deleted.
  668. // Fall back to checking for a pre-versioning object.
  669. glog.V(2).Infof("getLatestObjectVersion: no Extended metadata in .versions directory for %s%s, checking for pre-versioning object", bucket, object)
  670. regularEntry, regularErr := s3a.getEntry(bucketDir, object)
  671. if regularErr != nil {
  672. return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s%s", bucket, object)
  673. }
  674. glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s%s (no Extended metadata case)", bucket, object)
  675. return regularEntry, nil
  676. }
  677. latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]
  678. latestVersionFileBytes, hasLatestVersionFile := versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey]
  679. if !hasLatestVersionId || !hasLatestVersionFile {
  680. // No version metadata means all versioned objects have been deleted.
  681. // Fall back to checking for a pre-versioning object.
  682. glog.V(2).Infof("getLatestObjectVersion: no version metadata in .versions directory for %s/%s, checking for pre-versioning object", bucket, object)
  683. regularEntry, regularErr := s3a.getEntry(bucketDir, object)
  684. if regularErr != nil {
  685. return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s%s", bucket, object)
  686. }
  687. glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s%s after version deletion", bucket, object)
  688. return regularEntry, nil
  689. }
  690. latestVersionId := string(latestVersionIdBytes)
  691. latestVersionFile := string(latestVersionFileBytes)
  692. glog.V(2).Infof("getLatestObjectVersion: found latest version %s (file: %s) for %s/%s", latestVersionId, latestVersionFile, bucket, object)
  693. // Get the actual latest version file entry
  694. latestVersionPath := versionsObjectPath + "/" + latestVersionFile
  695. latestVersionEntry, err := s3a.getEntry(bucketDir, latestVersionPath)
  696. if err != nil {
  697. return nil, fmt.Errorf("failed to get latest version file %s: %v", latestVersionPath, err)
  698. }
  699. return latestVersionEntry, nil
  700. }
  701. // getObjectOwnerFromVersion extracts object owner information from version entry metadata
  702. func (s3a *S3ApiServer) getObjectOwnerFromVersion(version *ObjectVersion, bucket, objectKey string) CanonicalUser {
  703. // First try to get owner from the version entry itself
  704. if version.Entry != nil && version.Entry.Extended != nil {
  705. if ownerBytes, exists := version.Entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
  706. ownerId := string(ownerBytes)
  707. ownerDisplayName := s3a.iam.GetAccountNameById(ownerId)
  708. return CanonicalUser{ID: ownerId, DisplayName: ownerDisplayName}
  709. }
  710. }
  711. // Fallback: try to get owner from the current version of the object
  712. // This handles cases where older versions might not have owner metadata
  713. if version.VersionId == "null" {
  714. // For null version, check the regular object file
  715. bucketDir := s3a.option.BucketsPath + "/" + bucket
  716. if entry, err := s3a.getEntry(bucketDir, objectKey); err == nil && entry.Extended != nil {
  717. if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
  718. ownerId := string(ownerBytes)
  719. ownerDisplayName := s3a.iam.GetAccountNameById(ownerId)
  720. return CanonicalUser{ID: ownerId, DisplayName: ownerDisplayName}
  721. }
  722. }
  723. } else {
  724. // For versioned objects, try to get from latest version metadata
  725. if latestVersion, err := s3a.getLatestObjectVersion(bucket, objectKey); err == nil && latestVersion.Extended != nil {
  726. if ownerBytes, exists := latestVersion.Extended[s3_constants.ExtAmzOwnerKey]; exists {
  727. ownerId := string(ownerBytes)
  728. ownerDisplayName := s3a.iam.GetAccountNameById(ownerId)
  729. return CanonicalUser{ID: ownerId, DisplayName: ownerDisplayName}
  730. }
  731. }
  732. }
  733. // Ultimate fallback: return anonymous if no owner found
  734. return CanonicalUser{ID: s3_constants.AccountAnonymousId, DisplayName: "anonymous"}
  735. }
  736. // getObjectOwnerFromEntry extracts object owner information from a file entry
  737. func (s3a *S3ApiServer) getObjectOwnerFromEntry(entry *filer_pb.Entry) CanonicalUser {
  738. if entry != nil && entry.Extended != nil {
  739. if ownerBytes, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
  740. ownerId := string(ownerBytes)
  741. ownerDisplayName := s3a.iam.GetAccountNameById(ownerId)
  742. return CanonicalUser{ID: ownerId, DisplayName: ownerDisplayName}
  743. }
  744. }
  745. // Fallback: return anonymous if no owner found
  746. return CanonicalUser{ID: s3_constants.AccountAnonymousId, DisplayName: "anonymous"}
  747. }