s3api_object_handlers_delete.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. package s3api
  2. import (
  3. "encoding/xml"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "slices"
  8. "strings"
  9. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  10. "github.com/seaweedfs/seaweedfs/weed/filer"
  11. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. )
  17. const (
  18. deleteMultipleObjectsLimit = 1000
  19. )
  20. func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
  21. bucket, object := s3_constants.GetBucketAndObject(r)
  22. glog.V(3).Infof("DeleteObjectHandler %s %s", bucket, object)
  23. // Check for specific version ID in query parameters
  24. versionId := r.URL.Query().Get("versionId")
  25. // Get detailed versioning state for proper handling of suspended vs enabled versioning
  26. versioningState, err := s3a.getVersioningState(bucket)
  27. if err != nil {
  28. if err == filer_pb.ErrNotFound {
  29. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
  30. return
  31. }
  32. glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
  33. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  34. return
  35. }
  36. versioningEnabled := (versioningState == s3_constants.VersioningEnabled)
  37. versioningSuspended := (versioningState == s3_constants.VersioningSuspended)
  38. versioningConfigured := (versioningState != "")
  39. var auditLog *s3err.AccessLog
  40. if s3err.Logger != nil {
  41. auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
  42. }
  43. if versioningConfigured {
  44. // Handle versioned delete based on specific versioning state
  45. if versionId != "" {
  46. // Delete specific version (same for both enabled and suspended)
  47. // Check object lock permissions before deleting specific version
  48. governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
  49. if err := s3a.enforceObjectLockProtections(r, bucket, object, versionId, governanceBypassAllowed); err != nil {
  50. glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err)
  51. s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
  52. return
  53. }
  54. // Delete specific version
  55. err := s3a.deleteSpecificObjectVersion(bucket, object, versionId)
  56. if err != nil {
  57. glog.Errorf("Failed to delete specific version %s: %v", versionId, err)
  58. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  59. return
  60. }
  61. // Set version ID in response header
  62. w.Header().Set("x-amz-version-id", versionId)
  63. } else {
  64. // Delete without version ID - behavior depends on versioning state
  65. if versioningEnabled {
  66. // Enabled versioning: Create delete marker (logical delete)
  67. // AWS S3 behavior: Delete marker creation is NOT blocked by object retention
  68. // because it's a logical delete that doesn't actually remove the retained version
  69. deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object)
  70. if err != nil {
  71. glog.Errorf("Failed to create delete marker: %v", err)
  72. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  73. return
  74. }
  75. // Set delete marker version ID in response header
  76. w.Header().Set("x-amz-version-id", deleteMarkerVersionId)
  77. w.Header().Set("x-amz-delete-marker", "true")
  78. } else if versioningSuspended {
  79. // Suspended versioning: Actually delete the "null" version object
  80. glog.V(2).Infof("DeleteObjectHandler: deleting null version for suspended versioning %s/%s", bucket, object)
  81. // Check object lock permissions before deleting "null" version
  82. governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
  83. if err := s3a.enforceObjectLockProtections(r, bucket, object, "null", governanceBypassAllowed); err != nil {
  84. glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err)
  85. s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
  86. return
  87. }
  88. // Delete the "null" version (the regular file)
  89. err := s3a.deleteSpecificObjectVersion(bucket, object, "null")
  90. if err != nil {
  91. glog.Errorf("Failed to delete null version: %v", err)
  92. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  93. return
  94. }
  95. // Note: According to AWS S3 spec, suspended versioning should NOT return version ID headers
  96. // The object is deleted but no version information is returned
  97. }
  98. }
  99. } else {
  100. // Handle regular delete (non-versioned)
  101. // Check object lock permissions before deleting object
  102. governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
  103. if err := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); err != nil {
  104. glog.V(2).Infof("DeleteObjectHandler: object lock check failed for %s/%s: %v", bucket, object, err)
  105. s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
  106. return
  107. }
  108. target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
  109. dir, name := target.DirAndName()
  110. err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  111. if err := doDeleteEntry(client, dir, name, true, false); err != nil {
  112. return err
  113. }
  114. if s3a.option.AllowEmptyFolder {
  115. return nil
  116. }
  117. directoriesWithDeletion := make(map[string]int)
  118. if strings.LastIndex(object, "/") > 0 {
  119. directoriesWithDeletion[dir]++
  120. // purge empty folders, only checking folders with deletions
  121. for len(directoriesWithDeletion) > 0 {
  122. directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
  123. }
  124. }
  125. return nil
  126. })
  127. if err != nil {
  128. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  129. return
  130. }
  131. }
  132. if auditLog != nil {
  133. auditLog.Key = strings.TrimPrefix(object, "/")
  134. s3err.PostAccessLog(*auditLog)
  135. }
  136. stats_collect.RecordBucketActiveTime(bucket)
  137. stats_collect.S3DeletedObjectsCounter.WithLabelValues(bucket).Inc()
  138. w.WriteHeader(http.StatusNoContent)
  139. }
  140. // ObjectIdentifier represents an object to be deleted with its key name and optional version ID.
  141. type ObjectIdentifier struct {
  142. Key string `xml:"Key"`
  143. VersionId string `xml:"VersionId,omitempty"`
  144. DeleteMarker bool `xml:"DeleteMarker,omitempty"`
  145. DeleteMarkerVersionId string `xml:"DeleteMarkerVersionId,omitempty"`
  146. }
  147. // DeleteObjectsRequest - xml carrying the object key names which needs to be deleted.
  148. type DeleteObjectsRequest struct {
  149. // Element to enable quiet mode for the request
  150. Quiet bool
  151. // List of objects to be deleted
  152. Objects []ObjectIdentifier `xml:"Object"`
  153. }
  154. // DeleteError structure.
  155. type DeleteError struct {
  156. Code string `xml:"Code"`
  157. Message string `xml:"Message"`
  158. Key string `xml:"Key"`
  159. VersionId string `xml:"VersionId,omitempty"`
  160. }
  161. // DeleteObjectsResponse container for multiple object deletes.
  162. type DeleteObjectsResponse struct {
  163. XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DeleteResult" json:"-"`
  164. // Collection of all deleted objects
  165. DeletedObjects []ObjectIdentifier `xml:"Deleted,omitempty"`
  166. // Collection of errors deleting certain objects.
  167. Errors []DeleteError `xml:"Error,omitempty"`
  168. }
  169. // DeleteMultipleObjectsHandler - Delete multiple objects
  170. func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
  171. bucket, _ := s3_constants.GetBucketAndObject(r)
  172. glog.V(3).Infof("DeleteMultipleObjectsHandler %s", bucket)
  173. deleteXMLBytes, err := io.ReadAll(r.Body)
  174. if err != nil {
  175. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  176. return
  177. }
  178. deleteObjects := &DeleteObjectsRequest{}
  179. if err := xml.Unmarshal(deleteXMLBytes, deleteObjects); err != nil {
  180. s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
  181. return
  182. }
  183. if len(deleteObjects.Objects) > deleteMultipleObjectsLimit {
  184. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxDeleteObjects)
  185. return
  186. }
  187. var deletedObjects []ObjectIdentifier
  188. var deleteErrors []DeleteError
  189. var auditLog *s3err.AccessLog
  190. directoriesWithDeletion := make(map[string]int)
  191. if s3err.Logger != nil {
  192. auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
  193. }
  194. // Get detailed versioning state for proper handling of suspended vs enabled versioning
  195. versioningState, err := s3a.getVersioningState(bucket)
  196. if err != nil {
  197. if err == filer_pb.ErrNotFound {
  198. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
  199. return
  200. }
  201. glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
  202. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  203. return
  204. }
  205. versioningEnabled := (versioningState == s3_constants.VersioningEnabled)
  206. versioningSuspended := (versioningState == s3_constants.VersioningSuspended)
  207. versioningConfigured := (versioningState != "")
  208. s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  209. // delete file entries
  210. for _, object := range deleteObjects.Objects {
  211. if object.Key == "" {
  212. continue
  213. }
  214. // Check object lock permissions before deletion (only for versioned buckets)
  215. if versioningConfigured {
  216. // Validate governance bypass for this specific object
  217. governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object.Key)
  218. if err := s3a.enforceObjectLockProtections(r, bucket, object.Key, object.VersionId, governanceBypassAllowed); err != nil {
  219. glog.V(2).Infof("DeleteMultipleObjectsHandler: object lock check failed for %s/%s (version: %s): %v", bucket, object.Key, object.VersionId, err)
  220. deleteErrors = append(deleteErrors, DeleteError{
  221. Code: s3err.GetAPIError(s3err.ErrAccessDenied).Code,
  222. Message: s3err.GetAPIError(s3err.ErrAccessDenied).Description,
  223. Key: object.Key,
  224. VersionId: object.VersionId,
  225. })
  226. continue
  227. }
  228. }
  229. var deleteVersionId string
  230. var isDeleteMarker bool
  231. if versioningConfigured {
  232. // Handle versioned delete based on specific versioning state
  233. if object.VersionId != "" {
  234. // Delete specific version (same for both enabled and suspended)
  235. err := s3a.deleteSpecificObjectVersion(bucket, object.Key, object.VersionId)
  236. if err != nil {
  237. deleteErrors = append(deleteErrors, DeleteError{
  238. Code: "",
  239. Message: err.Error(),
  240. Key: object.Key,
  241. VersionId: object.VersionId,
  242. })
  243. continue
  244. }
  245. deleteVersionId = object.VersionId
  246. } else {
  247. // Delete without version ID - behavior depends on versioning state
  248. if versioningEnabled {
  249. // Enabled versioning: Create delete marker (logical delete)
  250. deleteMarkerVersionId, err := s3a.createDeleteMarker(bucket, object.Key)
  251. if err != nil {
  252. deleteErrors = append(deleteErrors, DeleteError{
  253. Code: "",
  254. Message: err.Error(),
  255. Key: object.Key,
  256. VersionId: object.VersionId,
  257. })
  258. continue
  259. }
  260. deleteVersionId = deleteMarkerVersionId
  261. isDeleteMarker = true
  262. } else if versioningSuspended {
  263. // Suspended versioning: Actually delete the "null" version object
  264. glog.V(2).Infof("DeleteMultipleObjectsHandler: deleting null version for suspended versioning %s/%s", bucket, object.Key)
  265. err := s3a.deleteSpecificObjectVersion(bucket, object.Key, "null")
  266. if err != nil {
  267. deleteErrors = append(deleteErrors, DeleteError{
  268. Code: "",
  269. Message: err.Error(),
  270. Key: object.Key,
  271. VersionId: "null",
  272. })
  273. continue
  274. }
  275. deleteVersionId = "null"
  276. // Note: For suspended versioning, we don't set isDeleteMarker=true
  277. // because we actually deleted the object, not created a delete marker
  278. }
  279. }
  280. // Add to successful deletions with version info
  281. deletedObject := ObjectIdentifier{
  282. Key: object.Key,
  283. VersionId: deleteVersionId,
  284. DeleteMarker: isDeleteMarker,
  285. }
  286. // For delete markers, also set DeleteMarkerVersionId field
  287. if isDeleteMarker {
  288. deletedObject.DeleteMarkerVersionId = deleteVersionId
  289. // Don't set VersionId for delete markers, use DeleteMarkerVersionId instead
  290. deletedObject.VersionId = ""
  291. }
  292. if !deleteObjects.Quiet {
  293. deletedObjects = append(deletedObjects, deletedObject)
  294. }
  295. if isDeleteMarker {
  296. // For delete markers, we don't need to track directories for cleanup
  297. continue
  298. }
  299. } else {
  300. // Handle non-versioned delete (original logic)
  301. lastSeparator := strings.LastIndex(object.Key, "/")
  302. parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.Key, true, false
  303. if lastSeparator > 0 && lastSeparator+1 < len(object.Key) {
  304. entryName = object.Key[lastSeparator+1:]
  305. parentDirectoryPath = "/" + object.Key[:lastSeparator]
  306. }
  307. parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath)
  308. err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
  309. if err == nil {
  310. directoriesWithDeletion[parentDirectoryPath]++
  311. deletedObjects = append(deletedObjects, object)
  312. } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
  313. deletedObjects = append(deletedObjects, object)
  314. } else {
  315. delete(directoriesWithDeletion, parentDirectoryPath)
  316. deleteErrors = append(deleteErrors, DeleteError{
  317. Code: "",
  318. Message: err.Error(),
  319. Key: object.Key,
  320. VersionId: object.VersionId,
  321. })
  322. }
  323. }
  324. if auditLog != nil {
  325. auditLog.Key = object.Key
  326. s3err.PostAccessLog(*auditLog)
  327. }
  328. }
  329. if s3a.option.AllowEmptyFolder {
  330. return nil
  331. }
  332. // purge empty folders, only checking folders with deletions
  333. for len(directoriesWithDeletion) > 0 {
  334. directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
  335. }
  336. return nil
  337. })
  338. deleteResp := DeleteObjectsResponse{}
  339. if !deleteObjects.Quiet {
  340. deleteResp.DeletedObjects = deletedObjects
  341. }
  342. deleteResp.Errors = deleteErrors
  343. stats_collect.RecordBucketActiveTime(bucket)
  344. stats_collect.S3DeletedObjectsCounter.WithLabelValues(bucket).Add(float64(len(deletedObjects)))
  345. writeSuccessResponseXML(w, r, deleteResp)
  346. }
  347. func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerClient, directoriesWithDeletion map[string]int) (newDirectoriesWithDeletion map[string]int) {
  348. var allDirs []string
  349. for dir := range directoriesWithDeletion {
  350. allDirs = append(allDirs, dir)
  351. }
  352. slices.SortFunc(allDirs, func(a, b string) int {
  353. return len(b) - len(a)
  354. })
  355. newDirectoriesWithDeletion = make(map[string]int)
  356. for _, dir := range allDirs {
  357. parentDir, dirName := util.FullPath(dir).DirAndName()
  358. if parentDir == s3a.option.BucketsPath {
  359. continue
  360. }
  361. if err := doDeleteEntry(client, parentDir, dirName, false, false); err != nil {
  362. glog.V(4).Infof("directory %s has %d deletion but still not empty: %v", dir, directoriesWithDeletion[dir], err)
  363. } else {
  364. newDirectoriesWithDeletion[parentDir]++
  365. }
  366. }
  367. return
  368. }