s3_multipart_iam.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. package s3api
  2. import (
  3. "fmt"
  4. "net/http"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  10. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  11. )
  12. // S3MultipartIAMManager handles IAM integration for multipart upload operations
  13. type S3MultipartIAMManager struct {
  14. s3iam *S3IAMIntegration
  15. }
  16. // NewS3MultipartIAMManager creates a new multipart IAM manager
  17. func NewS3MultipartIAMManager(s3iam *S3IAMIntegration) *S3MultipartIAMManager {
  18. return &S3MultipartIAMManager{
  19. s3iam: s3iam,
  20. }
  21. }
  22. // MultipartUploadRequest represents a multipart upload request
  23. type MultipartUploadRequest struct {
  24. Bucket string `json:"bucket"` // S3 bucket name
  25. ObjectKey string `json:"object_key"` // S3 object key
  26. UploadID string `json:"upload_id"` // Multipart upload ID
  27. PartNumber int `json:"part_number"` // Part number for upload part
  28. Operation string `json:"operation"` // Multipart operation type
  29. SessionToken string `json:"session_token"` // JWT session token
  30. Headers map[string]string `json:"headers"` // Request headers
  31. ContentSize int64 `json:"content_size"` // Content size for validation
  32. }
  33. // MultipartUploadPolicy represents security policies for multipart uploads
  34. type MultipartUploadPolicy struct {
  35. MaxPartSize int64 `json:"max_part_size"` // Maximum part size (5GB AWS limit)
  36. MinPartSize int64 `json:"min_part_size"` // Minimum part size (5MB AWS limit, except last part)
  37. MaxParts int `json:"max_parts"` // Maximum number of parts (10,000 AWS limit)
  38. MaxUploadDuration time.Duration `json:"max_upload_duration"` // Maximum time to complete multipart upload
  39. AllowedContentTypes []string `json:"allowed_content_types"` // Allowed content types
  40. RequiredHeaders []string `json:"required_headers"` // Required headers for validation
  41. IPWhitelist []string `json:"ip_whitelist"` // Allowed IP addresses/ranges
  42. }
  43. // MultipartOperation represents different multipart upload operations
  44. type MultipartOperation string
  45. const (
  46. MultipartOpInitiate MultipartOperation = "initiate"
  47. MultipartOpUploadPart MultipartOperation = "upload_part"
  48. MultipartOpComplete MultipartOperation = "complete"
  49. MultipartOpAbort MultipartOperation = "abort"
  50. MultipartOpList MultipartOperation = "list"
  51. MultipartOpListParts MultipartOperation = "list_parts"
  52. )
  53. // ValidateMultipartOperationWithIAM validates multipart operations using IAM policies
  54. func (iam *IdentityAccessManagement) ValidateMultipartOperationWithIAM(r *http.Request, identity *Identity, operation MultipartOperation) s3err.ErrorCode {
  55. if iam.iamIntegration == nil {
  56. // Fall back to standard validation
  57. return s3err.ErrNone
  58. }
  59. // Extract bucket and object from request
  60. bucket, object := s3_constants.GetBucketAndObject(r)
  61. // Determine the S3 action based on multipart operation
  62. action := determineMultipartS3Action(operation)
  63. // Extract session token from request
  64. sessionToken := extractSessionTokenFromRequest(r)
  65. if sessionToken == "" {
  66. // No session token - use standard auth
  67. return s3err.ErrNone
  68. }
  69. // Retrieve the actual principal ARN from the request header
  70. // This header is set during initial authentication and contains the correct assumed role ARN
  71. principalArn := r.Header.Get("X-SeaweedFS-Principal")
  72. if principalArn == "" {
  73. glog.V(0).Info("IAM authorization for multipart operation failed: missing principal ARN in request header")
  74. return s3err.ErrAccessDenied
  75. }
  76. // Create IAM identity for authorization
  77. iamIdentity := &IAMIdentity{
  78. Name: identity.Name,
  79. Principal: principalArn,
  80. SessionToken: sessionToken,
  81. Account: identity.Account,
  82. }
  83. // Authorize using IAM
  84. ctx := r.Context()
  85. errCode := iam.iamIntegration.AuthorizeAction(ctx, iamIdentity, action, bucket, object, r)
  86. if errCode != s3err.ErrNone {
  87. glog.V(3).Infof("IAM authorization failed for multipart operation: principal=%s operation=%s action=%s bucket=%s object=%s",
  88. iamIdentity.Principal, operation, action, bucket, object)
  89. return errCode
  90. }
  91. glog.V(3).Infof("IAM authorization succeeded for multipart operation: principal=%s operation=%s action=%s bucket=%s object=%s",
  92. iamIdentity.Principal, operation, action, bucket, object)
  93. return s3err.ErrNone
  94. }
  95. // ValidateMultipartRequestWithPolicy validates multipart request against security policy
  96. func (policy *MultipartUploadPolicy) ValidateMultipartRequestWithPolicy(req *MultipartUploadRequest) error {
  97. if req == nil {
  98. return fmt.Errorf("multipart request cannot be nil")
  99. }
  100. // Validate part size for upload part operations
  101. if req.Operation == string(MultipartOpUploadPart) {
  102. if req.ContentSize > policy.MaxPartSize {
  103. return fmt.Errorf("part size %d exceeds maximum allowed %d", req.ContentSize, policy.MaxPartSize)
  104. }
  105. // Minimum part size validation (except for last part)
  106. // Note: Last part validation would require knowing if this is the final part
  107. if req.ContentSize < policy.MinPartSize && req.ContentSize > 0 {
  108. glog.V(2).Infof("Part size %d is below minimum %d - assuming last part", req.ContentSize, policy.MinPartSize)
  109. }
  110. // Validate part number
  111. if req.PartNumber < 1 || req.PartNumber > policy.MaxParts {
  112. return fmt.Errorf("part number %d is invalid (must be 1-%d)", req.PartNumber, policy.MaxParts)
  113. }
  114. }
  115. // Validate required headers first
  116. if req.Headers != nil {
  117. for _, requiredHeader := range policy.RequiredHeaders {
  118. if _, exists := req.Headers[requiredHeader]; !exists {
  119. // Check lowercase version
  120. if _, exists := req.Headers[strings.ToLower(requiredHeader)]; !exists {
  121. return fmt.Errorf("required header %s is missing", requiredHeader)
  122. }
  123. }
  124. }
  125. }
  126. // Validate content type if specified
  127. if len(policy.AllowedContentTypes) > 0 && req.Headers != nil {
  128. contentType := req.Headers["Content-Type"]
  129. if contentType == "" {
  130. contentType = req.Headers["content-type"]
  131. }
  132. allowed := false
  133. for _, allowedType := range policy.AllowedContentTypes {
  134. if contentType == allowedType {
  135. allowed = true
  136. break
  137. }
  138. }
  139. if !allowed {
  140. return fmt.Errorf("content type %s is not allowed", contentType)
  141. }
  142. }
  143. return nil
  144. }
  145. // Enhanced multipart handlers with IAM integration
  146. // NewMultipartUploadWithIAM handles initiate multipart upload with IAM validation
  147. func (s3a *S3ApiServer) NewMultipartUploadWithIAM(w http.ResponseWriter, r *http.Request) {
  148. // Validate IAM permissions first
  149. if s3a.iam.iamIntegration != nil {
  150. if identity, errCode := s3a.iam.authRequest(r, s3_constants.ACTION_WRITE); errCode != s3err.ErrNone {
  151. s3err.WriteErrorResponse(w, r, errCode)
  152. return
  153. } else {
  154. // Additional multipart-specific IAM validation
  155. if errCode := s3a.iam.ValidateMultipartOperationWithIAM(r, identity, MultipartOpInitiate); errCode != s3err.ErrNone {
  156. s3err.WriteErrorResponse(w, r, errCode)
  157. return
  158. }
  159. }
  160. }
  161. // Delegate to existing handler
  162. s3a.NewMultipartUploadHandler(w, r)
  163. }
  164. // CompleteMultipartUploadWithIAM handles complete multipart upload with IAM validation
  165. func (s3a *S3ApiServer) CompleteMultipartUploadWithIAM(w http.ResponseWriter, r *http.Request) {
  166. // Validate IAM permissions first
  167. if s3a.iam.iamIntegration != nil {
  168. if identity, errCode := s3a.iam.authRequest(r, s3_constants.ACTION_WRITE); errCode != s3err.ErrNone {
  169. s3err.WriteErrorResponse(w, r, errCode)
  170. return
  171. } else {
  172. // Additional multipart-specific IAM validation
  173. if errCode := s3a.iam.ValidateMultipartOperationWithIAM(r, identity, MultipartOpComplete); errCode != s3err.ErrNone {
  174. s3err.WriteErrorResponse(w, r, errCode)
  175. return
  176. }
  177. }
  178. }
  179. // Delegate to existing handler
  180. s3a.CompleteMultipartUploadHandler(w, r)
  181. }
  182. // AbortMultipartUploadWithIAM handles abort multipart upload with IAM validation
  183. func (s3a *S3ApiServer) AbortMultipartUploadWithIAM(w http.ResponseWriter, r *http.Request) {
  184. // Validate IAM permissions first
  185. if s3a.iam.iamIntegration != nil {
  186. if identity, errCode := s3a.iam.authRequest(r, s3_constants.ACTION_WRITE); errCode != s3err.ErrNone {
  187. s3err.WriteErrorResponse(w, r, errCode)
  188. return
  189. } else {
  190. // Additional multipart-specific IAM validation
  191. if errCode := s3a.iam.ValidateMultipartOperationWithIAM(r, identity, MultipartOpAbort); errCode != s3err.ErrNone {
  192. s3err.WriteErrorResponse(w, r, errCode)
  193. return
  194. }
  195. }
  196. }
  197. // Delegate to existing handler
  198. s3a.AbortMultipartUploadHandler(w, r)
  199. }
  200. // ListMultipartUploadsWithIAM handles list multipart uploads with IAM validation
  201. func (s3a *S3ApiServer) ListMultipartUploadsWithIAM(w http.ResponseWriter, r *http.Request) {
  202. // Validate IAM permissions first
  203. if s3a.iam.iamIntegration != nil {
  204. if identity, errCode := s3a.iam.authRequest(r, s3_constants.ACTION_LIST); errCode != s3err.ErrNone {
  205. s3err.WriteErrorResponse(w, r, errCode)
  206. return
  207. } else {
  208. // Additional multipart-specific IAM validation
  209. if errCode := s3a.iam.ValidateMultipartOperationWithIAM(r, identity, MultipartOpList); errCode != s3err.ErrNone {
  210. s3err.WriteErrorResponse(w, r, errCode)
  211. return
  212. }
  213. }
  214. }
  215. // Delegate to existing handler
  216. s3a.ListMultipartUploadsHandler(w, r)
  217. }
  218. // UploadPartWithIAM handles upload part with IAM validation
  219. func (s3a *S3ApiServer) UploadPartWithIAM(w http.ResponseWriter, r *http.Request) {
  220. // Validate IAM permissions first
  221. if s3a.iam.iamIntegration != nil {
  222. if identity, errCode := s3a.iam.authRequest(r, s3_constants.ACTION_WRITE); errCode != s3err.ErrNone {
  223. s3err.WriteErrorResponse(w, r, errCode)
  224. return
  225. } else {
  226. // Additional multipart-specific IAM validation
  227. if errCode := s3a.iam.ValidateMultipartOperationWithIAM(r, identity, MultipartOpUploadPart); errCode != s3err.ErrNone {
  228. s3err.WriteErrorResponse(w, r, errCode)
  229. return
  230. }
  231. // Validate part size and other policies
  232. if err := s3a.validateUploadPartRequest(r); err != nil {
  233. glog.Errorf("Upload part validation failed: %v", err)
  234. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
  235. return
  236. }
  237. }
  238. }
  239. // Delegate to existing object PUT handler (which handles upload part)
  240. s3a.PutObjectHandler(w, r)
  241. }
  242. // Helper functions
  243. // determineMultipartS3Action maps multipart operations to granular S3 actions
  244. // This enables fine-grained IAM policies for multipart upload operations
  245. func determineMultipartS3Action(operation MultipartOperation) Action {
  246. switch operation {
  247. case MultipartOpInitiate:
  248. return s3_constants.ACTION_CREATE_MULTIPART_UPLOAD
  249. case MultipartOpUploadPart:
  250. return s3_constants.ACTION_UPLOAD_PART
  251. case MultipartOpComplete:
  252. return s3_constants.ACTION_COMPLETE_MULTIPART
  253. case MultipartOpAbort:
  254. return s3_constants.ACTION_ABORT_MULTIPART
  255. case MultipartOpList:
  256. return s3_constants.ACTION_LIST_MULTIPART_UPLOADS
  257. case MultipartOpListParts:
  258. return s3_constants.ACTION_LIST_PARTS
  259. default:
  260. // Fail closed for unmapped operations to prevent unintended access
  261. glog.Errorf("unmapped multipart operation: %s", operation)
  262. return "s3:InternalErrorUnknownMultipartAction" // Non-existent action ensures denial
  263. }
  264. }
  265. // extractSessionTokenFromRequest extracts session token from various request sources
  266. func extractSessionTokenFromRequest(r *http.Request) string {
  267. // Check Authorization header for Bearer token
  268. if authHeader := r.Header.Get("Authorization"); authHeader != "" {
  269. if strings.HasPrefix(authHeader, "Bearer ") {
  270. return strings.TrimPrefix(authHeader, "Bearer ")
  271. }
  272. }
  273. // Check X-Amz-Security-Token header
  274. if token := r.Header.Get("X-Amz-Security-Token"); token != "" {
  275. return token
  276. }
  277. // Check query parameters for presigned URL tokens
  278. if token := r.URL.Query().Get("X-Amz-Security-Token"); token != "" {
  279. return token
  280. }
  281. return ""
  282. }
  283. // validateUploadPartRequest validates upload part request against policies
  284. func (s3a *S3ApiServer) validateUploadPartRequest(r *http.Request) error {
  285. // Get default multipart policy
  286. policy := DefaultMultipartUploadPolicy()
  287. // Extract part number from query
  288. partNumberStr := r.URL.Query().Get("partNumber")
  289. if partNumberStr == "" {
  290. return fmt.Errorf("missing partNumber parameter")
  291. }
  292. partNumber, err := strconv.Atoi(partNumberStr)
  293. if err != nil {
  294. return fmt.Errorf("invalid partNumber: %v", err)
  295. }
  296. // Get content length
  297. contentLength := r.ContentLength
  298. if contentLength < 0 {
  299. contentLength = 0
  300. }
  301. // Create multipart request for validation
  302. bucket, object := s3_constants.GetBucketAndObject(r)
  303. multipartReq := &MultipartUploadRequest{
  304. Bucket: bucket,
  305. ObjectKey: object,
  306. PartNumber: partNumber,
  307. Operation: string(MultipartOpUploadPart),
  308. ContentSize: contentLength,
  309. Headers: make(map[string]string),
  310. }
  311. // Copy relevant headers
  312. for key, values := range r.Header {
  313. if len(values) > 0 {
  314. multipartReq.Headers[key] = values[0]
  315. }
  316. }
  317. // Validate against policy
  318. return policy.ValidateMultipartRequestWithPolicy(multipartReq)
  319. }
  320. // DefaultMultipartUploadPolicy returns a default multipart upload security policy
  321. func DefaultMultipartUploadPolicy() *MultipartUploadPolicy {
  322. return &MultipartUploadPolicy{
  323. MaxPartSize: 5 * 1024 * 1024 * 1024, // 5GB AWS limit
  324. MinPartSize: 5 * 1024 * 1024, // 5MB AWS minimum (except last part)
  325. MaxParts: 10000, // AWS limit
  326. MaxUploadDuration: 7 * 24 * time.Hour, // 7 days to complete upload
  327. AllowedContentTypes: []string{}, // Empty means all types allowed
  328. RequiredHeaders: []string{}, // No required headers by default
  329. IPWhitelist: []string{}, // Empty means no IP restrictions
  330. }
  331. }
  332. // MultipartUploadSession represents an ongoing multipart upload session
  333. type MultipartUploadSession struct {
  334. UploadID string `json:"upload_id"`
  335. Bucket string `json:"bucket"`
  336. ObjectKey string `json:"object_key"`
  337. Initiator string `json:"initiator"` // User who initiated the upload
  338. Owner string `json:"owner"` // Object owner
  339. CreatedAt time.Time `json:"created_at"` // When upload was initiated
  340. Parts []MultipartUploadPart `json:"parts"` // Uploaded parts
  341. Metadata map[string]string `json:"metadata"` // Object metadata
  342. Policy *MultipartUploadPolicy `json:"policy"` // Applied security policy
  343. SessionToken string `json:"session_token"` // IAM session token
  344. }
  345. // MultipartUploadPart represents an uploaded part
  346. type MultipartUploadPart struct {
  347. PartNumber int `json:"part_number"`
  348. Size int64 `json:"size"`
  349. ETag string `json:"etag"`
  350. LastModified time.Time `json:"last_modified"`
  351. Checksum string `json:"checksum"` // Optional integrity checksum
  352. }
  353. // GetMultipartUploadSessions retrieves active multipart upload sessions for a bucket
  354. func (s3a *S3ApiServer) GetMultipartUploadSessions(bucket string) ([]*MultipartUploadSession, error) {
  355. // This would typically query the filer for active multipart uploads
  356. // For now, return empty list as this is a placeholder for the full implementation
  357. return []*MultipartUploadSession{}, nil
  358. }
  359. // CleanupExpiredMultipartUploads removes expired multipart upload sessions
  360. func (s3a *S3ApiServer) CleanupExpiredMultipartUploads(maxAge time.Duration) error {
  361. // This would typically scan for and remove expired multipart uploads
  362. // Implementation would depend on how multipart sessions are stored in the filer
  363. glog.V(2).Infof("Cleanup expired multipart uploads older than %v", maxAge)
  364. return nil
  365. }