| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599 |
- package s3api
- import (
- "net/http"
- "net/http/httptest"
- "strings"
- "testing"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
- )
- // TestCheckGovernanceBypassPermissionResourceGeneration tests that the function
- // correctly generates resource paths for the permission check
- func TestCheckGovernanceBypassPermissionResourceGeneration(t *testing.T) {
- tests := []struct {
- name string
- bucket string
- object string
- expectedPath string
- description string
- }{
- {
- name: "simple_object",
- bucket: "test-bucket",
- object: "test-object.txt",
- expectedPath: "test-bucket/test-object.txt",
- description: "Simple bucket and object should be joined with slash",
- },
- {
- name: "object_with_leading_slash",
- bucket: "test-bucket",
- object: "/test-object.txt",
- expectedPath: "test-bucket/test-object.txt",
- description: "Leading slash should be trimmed from object name",
- },
- {
- name: "nested_object",
- bucket: "test-bucket",
- object: "/folder/subfolder/test-object.txt",
- expectedPath: "test-bucket/folder/subfolder/test-object.txt",
- description: "Nested object path should be handled correctly",
- },
- {
- name: "empty_object",
- bucket: "test-bucket",
- object: "",
- expectedPath: "test-bucket/",
- description: "Empty object should result in bucket with trailing slash",
- },
- {
- name: "root_object",
- bucket: "test-bucket",
- object: "/",
- expectedPath: "test-bucket/",
- description: "Root object should result in bucket with trailing slash",
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- // Test the resource generation logic used in checkGovernanceBypassPermission
- resource := strings.TrimPrefix(tt.object, "/")
- actualPath := tt.bucket + "/" + resource
- if actualPath != tt.expectedPath {
- t.Errorf("Resource path generation failed. Expected: %s, Got: %s. %s",
- tt.expectedPath, actualPath, tt.description)
- }
- })
- }
- }
- // TestCheckGovernanceBypassPermissionActionGeneration tests that the function
- // correctly generates action strings for IAM checking
- func TestCheckGovernanceBypassPermissionActionGeneration(t *testing.T) {
- tests := []struct {
- name string
- bucket string
- object string
- expectedBypassAction string
- expectedAdminAction string
- description string
- }{
- {
- name: "bypass_action_generation",
- bucket: "test-bucket",
- object: "test-object.txt",
- expectedBypassAction: "BypassGovernanceRetention:test-bucket/test-object.txt",
- expectedAdminAction: "Admin:test-bucket/test-object.txt",
- description: "Actions should be properly formatted with resource path",
- },
- {
- name: "leading_slash_handling",
- bucket: "test-bucket",
- object: "/test-object.txt",
- expectedBypassAction: "BypassGovernanceRetention:test-bucket/test-object.txt",
- expectedAdminAction: "Admin:test-bucket/test-object.txt",
- description: "Leading slash should be trimmed in action generation",
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- // Test the action generation logic used in checkGovernanceBypassPermission
- resource := strings.TrimPrefix(tt.object, "/")
- resourcePath := tt.bucket + "/" + resource
- bypassAction := s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION + ":" + resourcePath
- adminAction := s3_constants.ACTION_ADMIN + ":" + resourcePath
- if bypassAction != tt.expectedBypassAction {
- t.Errorf("Bypass action generation failed. Expected: %s, Got: %s. %s",
- tt.expectedBypassAction, bypassAction, tt.description)
- }
- if adminAction != tt.expectedAdminAction {
- t.Errorf("Admin action generation failed. Expected: %s, Got: %s. %s",
- tt.expectedAdminAction, adminAction, tt.description)
- }
- })
- }
- }
- // TestCheckGovernanceBypassPermissionErrorHandling tests error handling scenarios
- func TestCheckGovernanceBypassPermissionErrorHandling(t *testing.T) {
- // Note: This test demonstrates the expected behavior for different error scenarios
- // without requiring full IAM setup
- tests := []struct {
- name string
- bucket string
- object string
- description string
- }{
- {
- name: "empty_bucket",
- bucket: "",
- object: "test-object.txt",
- description: "Empty bucket should be handled gracefully",
- },
- {
- name: "special_characters",
- bucket: "test-bucket",
- object: "test object with spaces.txt",
- description: "Objects with special characters should be handled",
- },
- {
- name: "unicode_characters",
- bucket: "test-bucket",
- object: "测试文件.txt",
- description: "Objects with unicode characters should be handled",
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- // Test that the function doesn't panic with various inputs
- // This would normally call checkGovernanceBypassPermission
- // but since we don't have a full S3ApiServer setup, we just test
- // that the resource generation logic works without panicking
- resource := strings.TrimPrefix(tt.object, "/")
- resourcePath := tt.bucket + "/" + resource
- // Verify the resource path is generated
- if resourcePath == "" {
- t.Errorf("Resource path should not be empty for test case: %s", tt.description)
- }
- t.Logf("Generated resource path for %s: %s", tt.description, resourcePath)
- })
- }
- }
- // TestCheckGovernanceBypassPermissionIntegrationBehavior documents the expected behavior
- // when integrated with a full IAM system
- func TestCheckGovernanceBypassPermissionIntegrationBehavior(t *testing.T) {
- t.Skip("Documentation test - describes expected behavior with full IAM integration")
- // This test documents the expected behavior when checkGovernanceBypassPermission
- // is called with a full IAM system:
- //
- // 1. Function calls s3a.iam.authRequest() with the bypass action
- // 2. If authRequest returns errCode != s3err.ErrNone, function returns false
- // 3. If authRequest succeeds, function checks identity.canDo() with the bypass action
- // 4. If canDo() returns true, function returns true
- // 5. If bypass permission fails, function checks admin action with identity.canDo()
- // 6. If admin action succeeds, function returns true and logs admin access
- // 7. If all checks fail, function returns false
- //
- // The function correctly uses:
- // - s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION for bypass permission
- // - s3_constants.ACTION_ADMIN for admin permission
- // - Proper resource path generation with bucket/object format
- // - Trimming of leading slashes from object names
- }
- // TestGovernanceBypassPermission was removed because it tested the old
- // insecure behavior of trusting the AmzIsAdmin header. The new implementation
- // uses proper IAM authentication instead of relying on client-provided headers.
- // Test specifically for users with IAM bypass permission
- func TestGovernanceBypassWithIAMPermission(t *testing.T) {
- // This test demonstrates the expected behavior for non-admin users with bypass permission
- // In a real implementation, this would integrate with the full IAM system
- t.Skip("Integration test requires full IAM setup - demonstrates expected behavior")
- // The expected behavior would be:
- // 1. Non-admin user makes request with bypass header
- // 2. checkGovernanceBypassPermission calls s3a.iam.authRequest
- // 3. authRequest validates user identity and checks permissions
- // 4. If user has s3:BypassGovernanceRetention permission, return true
- // 5. Otherwise return false
- // For now, the function correctly returns false for non-admin users
- // when the IAM system doesn't have the user configured with bypass permission
- }
- func TestGovernancePermissionIntegration(t *testing.T) {
- // Note: This test demonstrates the expected integration behavior
- // In a real implementation, this would require setting up a proper IAM mock
- // with identities that have the bypass governance permission
- t.Skip("Integration test requires full IAM setup - demonstrates expected behavior")
- // This test would verify:
- // 1. User with BypassGovernanceRetention permission can bypass governance
- // 2. User without permission cannot bypass governance
- // 3. Admin users can always bypass governance
- // 4. Anonymous users cannot bypass governance
- }
- func TestGovernanceBypassHeader(t *testing.T) {
- tests := []struct {
- name string
- headerValue string
- expectedResult bool
- description string
- }{
- {
- name: "bypass_header_true",
- headerValue: "true",
- expectedResult: true,
- description: "Header with 'true' value should enable bypass",
- },
- {
- name: "bypass_header_false",
- headerValue: "false",
- expectedResult: false,
- description: "Header with 'false' value should not enable bypass",
- },
- {
- name: "bypass_header_empty",
- headerValue: "",
- expectedResult: false,
- description: "Empty header should not enable bypass",
- },
- {
- name: "bypass_header_invalid",
- headerValue: "invalid",
- expectedResult: false,
- description: "Invalid header value should not enable bypass",
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- req := httptest.NewRequest("DELETE", "/bucket/object", nil)
- if tt.headerValue != "" {
- req.Header.Set("x-amz-bypass-governance-retention", tt.headerValue)
- }
- result := req.Header.Get("x-amz-bypass-governance-retention") == "true"
- if result != tt.expectedResult {
- t.Errorf("bypass header check = %v, want %v. %s", result, tt.expectedResult, tt.description)
- }
- })
- }
- }
- func TestGovernanceRetentionModeChecking(t *testing.T) {
- tests := []struct {
- name string
- retentionMode string
- bypassGovernance bool
- hasPermission bool
- expectedError bool
- expectedErrorType string
- description string
- }{
- {
- name: "compliance_mode_cannot_bypass",
- retentionMode: s3_constants.RetentionModeCompliance,
- bypassGovernance: true,
- hasPermission: true,
- expectedError: true,
- expectedErrorType: "compliance mode",
- description: "Compliance mode should not be bypassable even with permission",
- },
- {
- name: "governance_mode_without_bypass",
- retentionMode: s3_constants.RetentionModeGovernance,
- bypassGovernance: false,
- hasPermission: false,
- expectedError: true,
- expectedErrorType: "governance mode",
- description: "Governance mode should be blocked without bypass",
- },
- {
- name: "governance_mode_with_bypass_no_permission",
- retentionMode: s3_constants.RetentionModeGovernance,
- bypassGovernance: true,
- hasPermission: false,
- expectedError: true,
- expectedErrorType: "permission",
- description: "Governance mode bypass should fail without permission",
- },
- {
- name: "governance_mode_with_bypass_and_permission",
- retentionMode: s3_constants.RetentionModeGovernance,
- bypassGovernance: true,
- hasPermission: true,
- expectedError: false,
- expectedErrorType: "",
- description: "Governance mode bypass should succeed with permission",
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- // Test validates the logic without actually needing the full implementation
- // This demonstrates the expected behavior patterns
- var hasError bool
- var errorType string
- if tt.retentionMode == s3_constants.RetentionModeCompliance {
- hasError = true
- errorType = "compliance mode"
- } else if tt.retentionMode == s3_constants.RetentionModeGovernance {
- if !tt.bypassGovernance {
- hasError = true
- errorType = "governance mode"
- } else if !tt.hasPermission {
- hasError = true
- errorType = "permission"
- }
- }
- if hasError != tt.expectedError {
- t.Errorf("expected error: %v, got error: %v. %s", tt.expectedError, hasError, tt.description)
- }
- if tt.expectedError && !strings.Contains(errorType, tt.expectedErrorType) {
- t.Errorf("expected error type containing '%s', got '%s'. %s", tt.expectedErrorType, errorType, tt.description)
- }
- })
- }
- }
- func TestGovernancePermissionActionGeneration(t *testing.T) {
- tests := []struct {
- name string
- bucket string
- object string
- expectedAction string
- description string
- }{
- {
- name: "bucket_and_object_action",
- bucket: "test-bucket",
- object: "/test-object", // Object has "/" prefix from GetBucketAndObject
- expectedAction: "BypassGovernanceRetention:test-bucket/test-object",
- description: "Action should be generated correctly for bucket and object",
- },
- {
- name: "bucket_only_action",
- bucket: "test-bucket",
- object: "",
- expectedAction: "BypassGovernanceRetention:test-bucket",
- description: "Action should be generated correctly for bucket only",
- },
- {
- name: "nested_object_action",
- bucket: "test-bucket",
- object: "/folder/subfolder/object", // Object has "/" prefix from GetBucketAndObject
- expectedAction: "BypassGovernanceRetention:test-bucket/folder/subfolder/object",
- description: "Action should be generated correctly for nested objects",
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- action := s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION + ":" + tt.bucket + tt.object
- if action != tt.expectedAction {
- t.Errorf("generated action: %s, expected: %s. %s", action, tt.expectedAction, tt.description)
- }
- })
- }
- }
- // TestGovernancePermissionEndToEnd tests the complete object lock permission flow
- func TestGovernancePermissionEndToEnd(t *testing.T) {
- t.Skip("End-to-end testing requires full S3 API server setup - demonstrates expected behavior")
- // This test demonstrates the end-to-end flow that would be tested in a full integration test
- // The checkObjectLockPermissions method is called by:
- // 1. DeleteObjectHandler - when versioning is enabled and object lock is configured
- // 2. DeleteMultipleObjectsHandler - for each object in versioned buckets
- // 3. PutObjectHandler - via checkObjectLockPermissionsForPut for versioned buckets
- // 4. PutObjectRetentionHandler - when setting retention on objects
- //
- // Each handler:
- // - Extracts bypassGovernance from "x-amz-bypass-governance-retention" header
- // - Calls checkObjectLockPermissions with the appropriate parameters
- // - Handles the returned errors appropriately (ErrAccessDenied, etc.)
- //
- // The method integrates with the IAM system through checkGovernanceBypassPermission
- // which validates the s3:BypassGovernanceRetention permission
- }
- // TestGovernancePermissionHTTPFlow tests the HTTP header processing and method calls
- func TestGovernancePermissionHTTPFlow(t *testing.T) {
- tests := []struct {
- name string
- headerValue string
- expectedBypassGovernance bool
- }{
- {
- name: "bypass_header_true",
- headerValue: "true",
- expectedBypassGovernance: true,
- },
- {
- name: "bypass_header_false",
- headerValue: "false",
- expectedBypassGovernance: false,
- },
- {
- name: "bypass_header_missing",
- headerValue: "",
- expectedBypassGovernance: false,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- // Create a mock HTTP request
- req, _ := http.NewRequest("DELETE", "/bucket/test-object", nil)
- if tt.headerValue != "" {
- req.Header.Set("x-amz-bypass-governance-retention", tt.headerValue)
- }
- // Test the header processing logic used in handlers
- bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true"
- if bypassGovernance != tt.expectedBypassGovernance {
- t.Errorf("Expected bypassGovernance to be %v, got %v", tt.expectedBypassGovernance, bypassGovernance)
- }
- })
- }
- }
- // TestGovernancePermissionMethodCalls tests that the governance permission methods are called correctly
- func TestGovernancePermissionMethodCalls(t *testing.T) {
- // Test that demonstrates the method call pattern used in handlers
- // This is the pattern used in DeleteObjectHandler:
- t.Run("delete_object_handler_pattern", func(t *testing.T) {
- req, _ := http.NewRequest("DELETE", "/bucket/test-object", nil)
- req.Header.Set("x-amz-bypass-governance-retention", "true")
- // Extract parameters as done in the handler
- bucket, object := s3_constants.GetBucketAndObject(req)
- versionId := req.URL.Query().Get("versionId")
- bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true"
- // Verify the parameters are extracted correctly
- // Note: The actual bucket and object extraction depends on the URL structure
- t.Logf("Extracted bucket: %s, object: %s", bucket, object)
- if versionId != "" {
- t.Errorf("Expected versionId to be empty, got %v", versionId)
- }
- if !bypassGovernance {
- t.Errorf("Expected bypassGovernance to be true")
- }
- })
- // This is the pattern used in PutObjectHandler:
- t.Run("put_object_handler_pattern", func(t *testing.T) {
- req, _ := http.NewRequest("PUT", "/bucket/test-object", nil)
- req.Header.Set("x-amz-bypass-governance-retention", "true")
- // Extract parameters as done in the handler
- bucket, object := s3_constants.GetBucketAndObject(req)
- bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true"
- versioningEnabled := true // Would be determined by isVersioningEnabled(bucket)
- // Verify the parameters are extracted correctly
- // Note: The actual bucket and object extraction depends on the URL structure
- t.Logf("Extracted bucket: %s, object: %s", bucket, object)
- if !bypassGovernance {
- t.Errorf("Expected bypassGovernance to be true")
- }
- if !versioningEnabled {
- t.Errorf("Expected versioningEnabled to be true")
- }
- })
- }
- // TestGovernanceBypassNotPermittedError tests that ErrGovernanceBypassNotPermitted
- // is returned when bypass is requested but the user lacks permission
- func TestGovernanceBypassNotPermittedError(t *testing.T) {
- // Test the error constant itself
- if ErrGovernanceBypassNotPermitted == nil {
- t.Error("ErrGovernanceBypassNotPermitted should be defined")
- }
- // Verify the error message
- expectedMessage := "user does not have permission to bypass governance retention"
- if ErrGovernanceBypassNotPermitted.Error() != expectedMessage {
- t.Errorf("expected error message '%s', got '%s'",
- expectedMessage, ErrGovernanceBypassNotPermitted.Error())
- }
- // Test the scenario where this error should be returned
- // This documents the expected behavior when:
- // 1. Object is under governance retention
- // 2. bypassGovernance is true
- // 3. checkGovernanceBypassPermission returns false
- testCases := []struct {
- name string
- retentionMode string
- bypassGovernance bool
- hasPermission bool
- expectedError error
- description string
- }{
- {
- name: "governance_bypass_without_permission",
- retentionMode: s3_constants.RetentionModeGovernance,
- bypassGovernance: true,
- hasPermission: false,
- expectedError: ErrGovernanceBypassNotPermitted,
- description: "Should return ErrGovernanceBypassNotPermitted when bypass is requested but user lacks permission",
- },
- {
- name: "governance_bypass_with_permission",
- retentionMode: s3_constants.RetentionModeGovernance,
- bypassGovernance: true,
- hasPermission: true,
- expectedError: nil,
- description: "Should succeed when bypass is requested and user has permission",
- },
- {
- name: "governance_no_bypass",
- retentionMode: s3_constants.RetentionModeGovernance,
- bypassGovernance: false,
- hasPermission: false,
- expectedError: ErrGovernanceModeActive,
- description: "Should return ErrGovernanceModeActive when bypass is not requested",
- },
- }
- for _, tc := range testCases {
- t.Run(tc.name, func(t *testing.T) {
- // This test documents the expected behavior pattern
- // The actual checkObjectLockPermissions method implements this logic:
- // if retention.Mode == s3_constants.RetentionModeGovernance {
- // if !bypassGovernance {
- // return ErrGovernanceModeActive
- // }
- // if !s3a.checkGovernanceBypassPermission(request, bucket, object) {
- // return ErrGovernanceBypassNotPermitted
- // }
- // }
- var simulatedError error
- if tc.retentionMode == s3_constants.RetentionModeGovernance {
- if !tc.bypassGovernance {
- simulatedError = ErrGovernanceModeActive
- } else if !tc.hasPermission {
- simulatedError = ErrGovernanceBypassNotPermitted
- }
- }
- if simulatedError != tc.expectedError {
- t.Errorf("expected error %v, got %v. %s", tc.expectedError, simulatedError, tc.description)
- }
- // Verify ErrGovernanceBypassNotPermitted is returned in the right case
- if tc.name == "governance_bypass_without_permission" && simulatedError != ErrGovernanceBypassNotPermitted {
- t.Errorf("Test case should return ErrGovernanceBypassNotPermitted but got %v", simulatedError)
- }
- })
- }
- }
|