| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- package s3api
- import (
- "context"
- "fmt"
- "strings"
- "testing"
- "time"
- "github.com/aws/aws-sdk-go-v2/aws"
- "github.com/aws/aws-sdk-go-v2/config"
- "github.com/aws/aws-sdk-go-v2/credentials"
- "github.com/aws/aws-sdk-go-v2/service/s3"
- "github.com/aws/aws-sdk-go-v2/service/s3/types"
- "github.com/k0kubun/pp"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- )
- // S3TestConfig holds configuration for S3 tests
- type S3TestConfig struct {
- Endpoint string
- AccessKey string
- SecretKey string
- Region string
- BucketPrefix string
- UseSSL bool
- SkipVerifySSL bool
- }
- // Default test configuration - should match s3tests.conf
- var defaultConfig = &S3TestConfig{
- Endpoint: "http://localhost:8333", // Default SeaweedFS S3 port
- AccessKey: "some_access_key1",
- SecretKey: "some_secret_key1",
- Region: "us-east-1",
- BucketPrefix: "test-versioning-",
- UseSSL: false,
- SkipVerifySSL: true,
- }
- // getS3Client creates an AWS S3 client for testing
- func getS3Client(t *testing.T) *s3.Client {
- cfg, err := config.LoadDefaultConfig(context.TODO(),
- config.WithRegion(defaultConfig.Region),
- config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
- defaultConfig.AccessKey,
- defaultConfig.SecretKey,
- "",
- )),
- config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(
- func(service, region string, options ...interface{}) (aws.Endpoint, error) {
- return aws.Endpoint{
- URL: defaultConfig.Endpoint,
- SigningRegion: defaultConfig.Region,
- HostnameImmutable: true,
- }, nil
- })),
- )
- require.NoError(t, err)
- return s3.NewFromConfig(cfg, func(o *s3.Options) {
- o.UsePathStyle = true // Important for SeaweedFS
- })
- }
- // getNewBucketName generates a unique bucket name
- func getNewBucketName() string {
- timestamp := time.Now().UnixNano()
- return fmt.Sprintf("%s%d", defaultConfig.BucketPrefix, timestamp)
- }
- // createBucket creates a new bucket for testing
- func createBucket(t *testing.T, client *s3.Client, bucketName string) {
- _, err := client.CreateBucket(context.TODO(), &s3.CreateBucketInput{
- Bucket: aws.String(bucketName),
- })
- require.NoError(t, err)
- }
- // deleteBucket deletes a bucket and all its contents
- func deleteBucket(t *testing.T, client *s3.Client, bucketName string) {
- // First, delete all objects and versions
- err := deleteAllObjectVersions(t, client, bucketName)
- if err != nil {
- t.Logf("Warning: failed to delete all object versions: %v", err)
- }
- // Then delete the bucket
- _, err = client.DeleteBucket(context.TODO(), &s3.DeleteBucketInput{
- Bucket: aws.String(bucketName),
- })
- if err != nil {
- t.Logf("Warning: failed to delete bucket %s: %v", bucketName, err)
- }
- }
- // deleteAllObjectVersions deletes all object versions in a bucket
- func deleteAllObjectVersions(t *testing.T, client *s3.Client, bucketName string) error {
- // List all object versions
- paginator := s3.NewListObjectVersionsPaginator(client, &s3.ListObjectVersionsInput{
- Bucket: aws.String(bucketName),
- })
- for paginator.HasMorePages() {
- page, err := paginator.NextPage(context.TODO())
- if err != nil {
- return err
- }
- var objectsToDelete []types.ObjectIdentifier
- // Add versions
- for _, version := range page.Versions {
- objectsToDelete = append(objectsToDelete, types.ObjectIdentifier{
- Key: version.Key,
- VersionId: version.VersionId,
- })
- }
- // Add delete markers
- for _, deleteMarker := range page.DeleteMarkers {
- objectsToDelete = append(objectsToDelete, types.ObjectIdentifier{
- Key: deleteMarker.Key,
- VersionId: deleteMarker.VersionId,
- })
- }
- // Delete objects in batches
- if len(objectsToDelete) > 0 {
- _, err := client.DeleteObjects(context.TODO(), &s3.DeleteObjectsInput{
- Bucket: aws.String(bucketName),
- Delete: &types.Delete{
- Objects: objectsToDelete,
- Quiet: aws.Bool(true),
- },
- })
- if err != nil {
- return err
- }
- }
- }
- return nil
- }
- // enableVersioning enables versioning on a bucket
- func enableVersioning(t *testing.T, client *s3.Client, bucketName string) {
- _, err := client.PutBucketVersioning(context.TODO(), &s3.PutBucketVersioningInput{
- Bucket: aws.String(bucketName),
- VersioningConfiguration: &types.VersioningConfiguration{
- Status: types.BucketVersioningStatusEnabled,
- },
- })
- require.NoError(t, err)
- }
- // checkVersioningStatus verifies the versioning status of a bucket
- func checkVersioningStatus(t *testing.T, client *s3.Client, bucketName string, expectedStatus types.BucketVersioningStatus) {
- resp, err := client.GetBucketVersioning(context.TODO(), &s3.GetBucketVersioningInput{
- Bucket: aws.String(bucketName),
- })
- require.NoError(t, err)
- assert.Equal(t, expectedStatus, resp.Status)
- }
- // checkVersioningStatusEmpty verifies that a bucket has no versioning configuration (newly created bucket)
- func checkVersioningStatusEmpty(t *testing.T, client *s3.Client, bucketName string) {
- resp, err := client.GetBucketVersioning(context.TODO(), &s3.GetBucketVersioningInput{
- Bucket: aws.String(bucketName),
- })
- require.NoError(t, err)
- // AWS S3 returns an empty versioning configuration (no Status field) for buckets that have never had versioning configured, such as newly created buckets.
- assert.Empty(t, resp.Status, "Newly created bucket should have empty versioning status")
- }
- // putObject puts an object into a bucket
- func putObject(t *testing.T, client *s3.Client, bucketName, key, content string) *s3.PutObjectOutput {
- resp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
- Bucket: aws.String(bucketName),
- Key: aws.String(key),
- Body: strings.NewReader(content),
- })
- require.NoError(t, err)
- return resp
- }
- // headObject gets object metadata
- func headObject(t *testing.T, client *s3.Client, bucketName, key string) *s3.HeadObjectOutput {
- resp, err := client.HeadObject(context.TODO(), &s3.HeadObjectInput{
- Bucket: aws.String(bucketName),
- Key: aws.String(key),
- })
- require.NoError(t, err)
- return resp
- }
- // TestBucketListReturnDataVersioning is the Go equivalent of test_bucket_list_return_data_versioning
- func TestBucketListReturnDataVersioning(t *testing.T) {
- client := getS3Client(t)
- bucketName := getNewBucketName()
- // Create bucket
- createBucket(t, client, bucketName)
- defer deleteBucket(t, client, bucketName)
- // Enable versioning
- enableVersioning(t, client, bucketName)
- checkVersioningStatus(t, client, bucketName, types.BucketVersioningStatusEnabled)
- // Create test objects
- keyNames := []string{"bar", "baz", "foo"}
- objectData := make(map[string]map[string]interface{})
- for _, keyName := range keyNames {
- // Put the object
- putResp := putObject(t, client, bucketName, keyName, keyName) // content = key name
- // Get object metadata
- headResp := headObject(t, client, bucketName, keyName)
- // Store expected data for later comparison
- objectData[keyName] = map[string]interface{}{
- "ETag": *headResp.ETag,
- "LastModified": *headResp.LastModified,
- "ContentLength": headResp.ContentLength,
- "VersionId": *headResp.VersionId,
- }
- // Verify version ID was returned
- require.NotNil(t, putResp.VersionId)
- require.NotEmpty(t, *putResp.VersionId)
- assert.Equal(t, *putResp.VersionId, *headResp.VersionId)
- }
- // List object versions
- resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
- Bucket: aws.String(bucketName),
- })
- require.NoError(t, err)
- // Verify we have the expected number of versions
- assert.Len(t, resp.Versions, len(keyNames))
- // Check each version matches our stored data
- versionsByKey := make(map[string]types.ObjectVersion)
- for _, version := range resp.Versions {
- versionsByKey[*version.Key] = version
- }
- for _, keyName := range keyNames {
- version, exists := versionsByKey[keyName]
- require.True(t, exists, "Expected version for key %s", keyName)
- expectedData := objectData[keyName]
- // Compare ETag
- assert.Equal(t, expectedData["ETag"], *version.ETag)
- // Compare Size
- assert.Equal(t, expectedData["ContentLength"], version.Size)
- // Compare VersionId
- assert.Equal(t, expectedData["VersionId"], *version.VersionId)
- // Compare LastModified (within reasonable tolerance)
- expectedTime := expectedData["LastModified"].(time.Time)
- actualTime := *version.LastModified
- timeDiff := actualTime.Sub(expectedTime)
- if timeDiff < 0 {
- timeDiff = -timeDiff
- }
- assert.True(t, timeDiff < time.Minute, "LastModified times should be close")
- // Verify this is marked as the latest version
- assert.True(t, *version.IsLatest)
- // Verify it's not a delete marker
- // (delete markers should be in resp.DeleteMarkers, not resp.Versions)
- }
- // Verify no delete markers
- assert.Empty(t, resp.DeleteMarkers)
- t.Logf("Successfully verified %d versioned objects", len(keyNames))
- }
- // TestVersioningBasicWorkflow tests basic versioning operations
- func TestVersioningBasicWorkflow(t *testing.T) {
- client := getS3Client(t)
- bucketName := getNewBucketName()
- // Create bucket
- createBucket(t, client, bucketName)
- defer deleteBucket(t, client, bucketName)
- // Initially, versioning should be unset/empty (not suspended) for newly created buckets
- // This matches AWS S3 behavior where new buckets have no versioning status
- checkVersioningStatusEmpty(t, client, bucketName)
- // Enable versioning
- enableVersioning(t, client, bucketName)
- checkVersioningStatus(t, client, bucketName, types.BucketVersioningStatusEnabled)
- // Put same object multiple times to create versions
- key := "test-object"
- version1 := putObject(t, client, bucketName, key, "content-v1")
- version2 := putObject(t, client, bucketName, key, "content-v2")
- version3 := putObject(t, client, bucketName, key, "content-v3")
- // Verify each put returned a different version ID
- require.NotEqual(t, *version1.VersionId, *version2.VersionId)
- require.NotEqual(t, *version2.VersionId, *version3.VersionId)
- require.NotEqual(t, *version1.VersionId, *version3.VersionId)
- // List versions
- resp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
- Bucket: aws.String(bucketName),
- })
- require.NoError(t, err)
- // Should have 3 versions
- assert.Len(t, resp.Versions, 3)
- // Only the latest should be marked as latest
- latestCount := 0
- for _, version := range resp.Versions {
- if *version.IsLatest {
- latestCount++
- assert.Equal(t, *version3.VersionId, *version.VersionId)
- }
- }
- assert.Equal(t, 1, latestCount, "Only one version should be marked as latest")
- t.Logf("Successfully created and verified %d versions", len(resp.Versions))
- }
- // TestVersioningDeleteMarkers tests delete marker creation
- func TestVersioningDeleteMarkers(t *testing.T) {
- client := getS3Client(t)
- bucketName := getNewBucketName()
- // Create bucket and enable versioning
- createBucket(t, client, bucketName)
- defer deleteBucket(t, client, bucketName)
- enableVersioning(t, client, bucketName)
- // Put an object
- key := "test-delete-marker"
- putResp := putObject(t, client, bucketName, key, "content")
- require.NotNil(t, putResp.VersionId)
- // Delete the object (should create delete marker)
- deleteResp, err := client.DeleteObject(context.TODO(), &s3.DeleteObjectInput{
- Bucket: aws.String(bucketName),
- Key: aws.String(key),
- })
- require.NoError(t, err)
- require.NotNil(t, deleteResp.VersionId)
- // List versions to see the delete marker
- listResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
- Bucket: aws.String(bucketName),
- })
- require.NoError(t, err)
- // Should have 1 version and 1 delete marker
- assert.Len(t, listResp.Versions, 1)
- assert.Len(t, listResp.DeleteMarkers, 1)
- // The delete marker should be the latest
- deleteMarker := listResp.DeleteMarkers[0]
- assert.True(t, *deleteMarker.IsLatest)
- assert.Equal(t, *deleteResp.VersionId, *deleteMarker.VersionId)
- // The original version should not be latest
- version := listResp.Versions[0]
- assert.False(t, *version.IsLatest)
- assert.Equal(t, *putResp.VersionId, *version.VersionId)
- t.Logf("Successfully created and verified delete marker")
- }
- // TestVersioningConcurrentOperations tests concurrent versioning operations
- func TestVersioningConcurrentOperations(t *testing.T) {
- client := getS3Client(t)
- bucketName := getNewBucketName()
- // Create bucket and enable versioning
- createBucket(t, client, bucketName)
- defer deleteBucket(t, client, bucketName)
- enableVersioning(t, client, bucketName)
- // Concurrently create multiple objects
- numObjects := 10
- objectKey := "concurrent-test"
- // Channel to collect version IDs
- versionIds := make(chan string, numObjects)
- errors := make(chan error, numObjects)
- // Launch concurrent puts
- for i := 0; i < numObjects; i++ {
- go func(index int) {
- content := fmt.Sprintf("content-%d", index)
- resp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{
- Bucket: aws.String(bucketName),
- Key: aws.String(objectKey),
- Body: strings.NewReader(content),
- })
- if err != nil {
- errors <- err
- return
- }
- versionIds <- *resp.VersionId
- }(i)
- }
- // Collect results
- var collectedVersionIds []string
- for i := 0; i < numObjects; i++ {
- select {
- case versionId := <-versionIds:
- t.Logf("Received Version ID %d: %s", i, versionId)
- collectedVersionIds = append(collectedVersionIds, versionId)
- case err := <-errors:
- t.Fatalf("Concurrent put failed: %v", err)
- case <-time.After(30 * time.Second):
- t.Fatalf("Timeout waiting for concurrent operations")
- }
- }
- // Verify all version IDs are unique
- versionIdSet := make(map[string]bool)
- for _, versionId := range collectedVersionIds {
- assert.False(t, versionIdSet[versionId], "Version ID should be unique: %s", versionId)
- versionIdSet[versionId] = true
- }
- // List versions and verify count
- listResp, err := client.ListObjectVersions(context.TODO(), &s3.ListObjectVersionsInput{
- Bucket: aws.String(bucketName),
- })
- pp.Println(listResp)
- require.NoError(t, err)
- assert.Len(t, listResp.Versions, numObjects)
- t.Logf("Successfully created %d concurrent versions with unique IDs", numObjects)
- }
|