s3_versioning_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. package s3api
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "testing"
  7. "time"
  8. "github.com/aws/aws-sdk-go-v2/aws"
  9. "github.com/aws/aws-sdk-go-v2/config"
  10. "github.com/aws/aws-sdk-go-v2/credentials"
  11. "github.com/aws/aws-sdk-go-v2/service/s3"
  12. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  13. "github.com/k0kubun/pp"
  14. "github.com/stretchr/testify/assert"
  15. "github.com/stretchr/testify/require"
  16. )
  17. // S3TestConfig holds configuration for S3 tests
  18. type S3TestConfig struct {
  19. Endpoint string
  20. AccessKey string
  21. SecretKey string
  22. Region string
  23. BucketPrefix string
  24. UseSSL bool
  25. SkipVerifySSL bool
  26. }
  27. // Default test configuration - should match s3tests.conf
  28. var defaultConfig = &S3TestConfig{
  29. Endpoint: "http://localhost:8333", // Default SeaweedFS S3 port
  30. AccessKey: "some_access_key1",
  31. SecretKey: "some_secret_key1",
  32. Region: "us-east-1",
  33. BucketPrefix: "test-versioning-",
  34. UseSSL: false,
  35. SkipVerifySSL: true,
  36. }
  37. // getS3Client creates an AWS S3 client for testing
  38. func getS3Client(t *testing.T) *s3.Client {
  39. cfg, err := config.LoadDefaultConfig(context.TODO(),
  40. config.WithRegion(defaultConfig.Region),
  41. config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
  42. defaultConfig.AccessKey,
  43. defaultConfig.SecretKey,
  44. "",
  45. )),
  46. config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(
  47. func(service, region string, options ...interface{}) (aws.Endpoint, error) {
  48. return aws.Endpoint{
  49. URL: defaultConfig.Endpoint,
  50. SigningRegion: defaultConfig.Region,
  51. HostnameImmutable: true,
  52. }, nil
  53. })),
  54. )
  55. require.NoError(t, err)
  56. return s3.NewFromConfig(cfg, func(o *s3.Options) {
  57. o.UsePathStyle = true // Important for SeaweedFS
  58. })
  59. }
  60. // getNewBucketName generates a unique bucket name
  61. func getNewBucketName() string {
  62. timestamp := time.Now().UnixNano()
  63. return fmt.Sprintf("%s%d", defaultConfig.BucketPrefix, timestamp)
  64. }
  65. // createBucket creates a new bucket for testing
  66. func createBucket(t *testing.T, client *s3.Client, bucketName string) {
  67. _, err := client.CreateBucket(context.TODO(), &s3.CreateBucketInput{
  68. Bucket: aws.String(bucketName),
  69. })
  70. require.NoError(t, err)
  71. }
  72. // deleteBucket deletes a bucket and all its contents
  73. func deleteBucket(t *testing.T, client *s3.Client, bucketName string) {
  74. // First, delete all objects and versions
  75. err := deleteAllObjectVersions(t, client, bucketName)
  76. if err != nil {
  77. t.Logf("Warning: failed to delete all object versions: %v", err)
  78. }
  79. // Then delete the bucket
  80. _, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
  81. Bucket: aws.String(bucketName),
  82. })
  83. if err != nil {
  84. t.Logf("Warning: failed to delete bucket %s: %v", bucketName, err)
  85. }
  86. }
  87. // deleteAllObjectVersions deletes all object versions in a bucket
  88. func deleteAllObjectVersions(t *testing.T, client *s3.Client, bucketName string) error {
  89. // List all object versions
  90. paginator := s3.NewListObjectVersionsPaginator(client, &s3.ListObjectVersionsInput{
  91. Bucket: aws.String(bucketName),
  92. })
  93. for paginator.HasMorePages() {
  94. page, err := paginator.NextPage(context.TODO())
  95. if err != nil {
  96. return err
  97. }
  98. var objectsToDelete []types.ObjectIdentifier
  99. // Add versions
  100. for _, version := range page.Versions {
  101. objectsToDelete = append(objectsToDelete, types.ObjectIdentifier{
  102. Key: version.Key,
  103. VersionId: version.VersionId,
  104. })
  105. }
  106. // Add delete markers
  107. for _, deleteMarker := range page.DeleteMarkers {
  108. objectsToDelete = append(objectsToDelete, types.ObjectIdentifier{
  109. Key: deleteMarker.Key,
  110. VersionId: deleteMarker.VersionId,
  111. })
  112. }
  113. // Delete objects in batches
  114. if len(objectsToDelete) > 0 {
  115. _, err := client.DeleteObjects(context.TODO(), &s3.DeleteObjectsInput{
  116. Bucket: aws.String(bucketName),
  117. Delete: &types.Delete{
  118. Objects: objectsToDelete,
  119. Quiet: aws.Bool(true),
  120. },
  121. })
  122. if err != nil {
  123. return err
  124. }
  125. }
  126. }
  127. return nil
  128. }
  129. // enableVersioning enables versioning on a bucket
  130. func enableVersioning(t *testing.T, client *s3.Client, bucketName string) {
  131. _, err := client.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{
  132. Bucket: aws.String(bucketName),
  133. VersioningConfiguration: &types.VersioningConfiguration{
  134. Status: types.BucketVersioningStatusEnabled,
  135. },
  136. })
  137. require.NoError(t, err)
  138. }
  139. // checkVersioningStatus verifies the versioning status of a bucket
  140. func checkVersioningStatus(t *testing.T, client *s3.Client, bucketName string, expectedStatus types.BucketVersioningStatus) {
  141. resp, err := client.GetBucketVersioning(context.TODO(), &s3.GetBucketVersioningInput{
  142. Bucket: aws.String(bucketName),
  143. })
  144. require.NoError(t, err)
  145. assert.Equal(t, expectedStatus, resp.Status)
  146. }
  147. // checkVersioningStatusEmpty verifies that a bucket has no versioning configuration (newly created bucket)
  148. func checkVersioningStatusEmpty(t *testing.T, client *s3.Client, bucketName string) {
  149. resp, err := client.GetBucketVersioning(context.TODO(), &s3.GetBucketVersioningInput{
  150. Bucket: aws.String(bucketName),
  151. })
  152. require.NoError(t, err)
  153. // AWS S3 returns an empty versioning configuration (no Status field) for buckets that have never had versioning configured, such as newly created buckets.
  154. assert.Empty(t, resp.Status, "Newly created bucket should have empty versioning status")
  155. }
  156. // putObject puts an object into a bucket
  157. func putObject(t *testing.T, client *s3.Client, bucketName, key, content string) *s3.PutObjectOutput {
  158. resp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
  159. Bucket: aws.String(bucketName),
  160. Key: aws.String(key),
  161. Body: strings.NewReader(content),
  162. })
  163. require.NoError(t, err)
  164. return resp
  165. }
  166. // headObject gets object metadata
  167. func headObject(t *testing.T, client *s3.Client, bucketName, key string) *s3.HeadObjectOutput {
  168. resp, err := client.HeadObject(context.TODO(), &s3.HeadObjectInput{
  169. Bucket: aws.String(bucketName),
  170. Key: aws.String(key),
  171. })
  172. require.NoError(t, err)
  173. return resp
  174. }
  175. // TestBucketListReturnDataVersioning is the Go equivalent of test_bucket_list_return_data_versioning
  176. func TestBucketListReturnDataVersioning(t *testing.T) {
  177. client := getS3Client(t)
  178. bucketName := getNewBucketName()
  179. // Create bucket
  180. createBucket(t, client, bucketName)
  181. defer deleteBucket(t, client, bucketName)
  182. // Enable versioning
  183. enableVersioning(t, client, bucketName)
  184. checkVersioningStatus(t, client, bucketName, types.BucketVersioningStatusEnabled)
  185. // Create test objects
  186. keyNames := []string{"bar", "baz", "foo"}
  187. objectData := make(map[string]map[string]interface{})
  188. for _, keyName := range keyNames {
  189. // Put the object
  190. putResp := putObject(t, client, bucketName, keyName, keyName) // content = key name
  191. // Get object metadata
  192. headResp := headObject(t, client, bucketName, keyName)
  193. // Store expected data for later comparison
  194. objectData[keyName] = map[string]interface{}{
  195. "ETag": *headResp.ETag,
  196. "LastModified": *headResp.LastModified,
  197. "ContentLength": headResp.ContentLength,
  198. "VersionId": *headResp.VersionId,
  199. }
  200. // Verify version ID was returned
  201. require.NotNil(t, putResp.VersionId)
  202. require.NotEmpty(t, *putResp.VersionId)
  203. assert.Equal(t, *putResp.VersionId, *headResp.VersionId)
  204. }
  205. // List object versions
  206. resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
  207. Bucket: aws.String(bucketName),
  208. })
  209. require.NoError(t, err)
  210. // Verify we have the expected number of versions
  211. assert.Len(t, resp.Versions, len(keyNames))
  212. // Check each version matches our stored data
  213. versionsByKey := make(map[string]types.ObjectVersion)
  214. for _, version := range resp.Versions {
  215. versionsByKey[*version.Key] = version
  216. }
  217. for _, keyName := range keyNames {
  218. version, exists := versionsByKey[keyName]
  219. require.True(t, exists, "Expected version for key %s", keyName)
  220. expectedData := objectData[keyName]
  221. // Compare ETag
  222. assert.Equal(t, expectedData["ETag"], *version.ETag)
  223. // Compare Size
  224. assert.Equal(t, expectedData["ContentLength"], version.Size)
  225. // Compare VersionId
  226. assert.Equal(t, expectedData["VersionId"], *version.VersionId)
  227. // Compare LastModified (within reasonable tolerance)
  228. expectedTime := expectedData["LastModified"].(time.Time)
  229. actualTime := *version.LastModified
  230. timeDiff := actualTime.Sub(expectedTime)
  231. if timeDiff < 0 {
  232. timeDiff = -timeDiff
  233. }
  234. assert.True(t, timeDiff < time.Minute, "LastModified times should be close")
  235. // Verify this is marked as the latest version
  236. assert.True(t, *version.IsLatest)
  237. // Verify it's not a delete marker
  238. // (delete markers should be in resp.DeleteMarkers, not resp.Versions)
  239. }
  240. // Verify no delete markers
  241. assert.Empty(t, resp.DeleteMarkers)
  242. t.Logf("Successfully verified %d versioned objects", len(keyNames))
  243. }
  244. // TestVersioningBasicWorkflow tests basic versioning operations
  245. func TestVersioningBasicWorkflow(t *testing.T) {
  246. client := getS3Client(t)
  247. bucketName := getNewBucketName()
  248. // Create bucket
  249. createBucket(t, client, bucketName)
  250. defer deleteBucket(t, client, bucketName)
  251. // Initially, versioning should be unset/empty (not suspended) for newly created buckets
  252. // This matches AWS S3 behavior where new buckets have no versioning status
  253. checkVersioningStatusEmpty(t, client, bucketName)
  254. // Enable versioning
  255. enableVersioning(t, client, bucketName)
  256. checkVersioningStatus(t, client, bucketName, types.BucketVersioningStatusEnabled)
  257. // Put same object multiple times to create versions
  258. key := "test-object"
  259. version1 := putObject(t, client, bucketName, key, "content-v1")
  260. version2 := putObject(t, client, bucketName, key, "content-v2")
  261. version3 := putObject(t, client, bucketName, key, "content-v3")
  262. // Verify each put returned a different version ID
  263. require.NotEqual(t, *version1.VersionId, *version2.VersionId)
  264. require.NotEqual(t, *version2.VersionId, *version3.VersionId)
  265. require.NotEqual(t, *version1.VersionId, *version3.VersionId)
  266. // List versions
  267. resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
  268. Bucket: aws.String(bucketName),
  269. })
  270. require.NoError(t, err)
  271. // Should have 3 versions
  272. assert.Len(t, resp.Versions, 3)
  273. // Only the latest should be marked as latest
  274. latestCount := 0
  275. for _, version := range resp.Versions {
  276. if *version.IsLatest {
  277. latestCount++
  278. assert.Equal(t, *version3.VersionId, *version.VersionId)
  279. }
  280. }
  281. assert.Equal(t, 1, latestCount, "Only one version should be marked as latest")
  282. t.Logf("Successfully created and verified %d versions", len(resp.Versions))
  283. }
  284. // TestVersioningDeleteMarkers tests delete marker creation
  285. func TestVersioningDeleteMarkers(t *testing.T) {
  286. client := getS3Client(t)
  287. bucketName := getNewBucketName()
  288. // Create bucket and enable versioning
  289. createBucket(t, client, bucketName)
  290. defer deleteBucket(t, client, bucketName)
  291. enableVersioning(t, client, bucketName)
  292. // Put an object
  293. key := "test-delete-marker"
  294. putResp := putObject(t, client, bucketName, key, "content")
  295. require.NotNil(t, putResp.VersionId)
  296. // Delete the object (should create delete marker)
  297. deleteResp, err := client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
  298. Bucket: aws.String(bucketName),
  299. Key: aws.String(key),
  300. })
  301. require.NoError(t, err)
  302. require.NotNil(t, deleteResp.VersionId)
  303. // List versions to see the delete marker
  304. listResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
  305. Bucket: aws.String(bucketName),
  306. })
  307. require.NoError(t, err)
  308. // Should have 1 version and 1 delete marker
  309. assert.Len(t, listResp.Versions, 1)
  310. assert.Len(t, listResp.DeleteMarkers, 1)
  311. // The delete marker should be the latest
  312. deleteMarker := listResp.DeleteMarkers[0]
  313. assert.True(t, *deleteMarker.IsLatest)
  314. assert.Equal(t, *deleteResp.VersionId, *deleteMarker.VersionId)
  315. // The original version should not be latest
  316. version := listResp.Versions[0]
  317. assert.False(t, *version.IsLatest)
  318. assert.Equal(t, *putResp.VersionId, *version.VersionId)
  319. t.Logf("Successfully created and verified delete marker")
  320. }
  321. // TestVersioningConcurrentOperations tests concurrent versioning operations
  322. func TestVersioningConcurrentOperations(t *testing.T) {
  323. client := getS3Client(t)
  324. bucketName := getNewBucketName()
  325. // Create bucket and enable versioning
  326. createBucket(t, client, bucketName)
  327. defer deleteBucket(t, client, bucketName)
  328. enableVersioning(t, client, bucketName)
  329. // Concurrently create multiple objects
  330. numObjects := 10
  331. objectKey := "concurrent-test"
  332. // Channel to collect version IDs
  333. versionIds := make(chan string, numObjects)
  334. errors := make(chan error, numObjects)
  335. // Launch concurrent puts
  336. for i := 0; i < numObjects; i++ {
  337. go func(index int) {
  338. content := fmt.Sprintf("content-%d", index)
  339. resp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
  340. Bucket: aws.String(bucketName),
  341. Key: aws.String(objectKey),
  342. Body: strings.NewReader(content),
  343. })
  344. if err != nil {
  345. errors <- err
  346. return
  347. }
  348. versionIds <- *resp.VersionId
  349. }(i)
  350. }
  351. // Collect results
  352. var collectedVersionIds []string
  353. for i := 0; i < numObjects; i++ {
  354. select {
  355. case versionId := <-versionIds:
  356. t.Logf("Received Version ID %d: %s", i, versionId)
  357. collectedVersionIds = append(collectedVersionIds, versionId)
  358. case err := <-errors:
  359. t.Fatalf("Concurrent put failed: %v", err)
  360. case <-time.After(30 * time.Second):
  361. t.Fatalf("Timeout waiting for concurrent operations")
  362. }
  363. }
  364. // Verify all version IDs are unique
  365. versionIdSet := make(map[string]bool)
  366. for _, versionId := range collectedVersionIds {
  367. assert.False(t, versionIdSet[versionId], "Version ID should be unique: %s", versionId)
  368. versionIdSet[versionId] = true
  369. }
  370. // List versions and verify count
  371. listResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
  372. Bucket: aws.String(bucketName),
  373. })
  374. pp.Println(listResp)
  375. require.NoError(t, err)
  376. assert.Len(t, listResp.Versions, numObjects)
  377. t.Logf("Successfully created %d concurrent versions with unique IDs", numObjects)
  378. }