s3api_object_handlers_copy.go 80 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290
  1. package s3api
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/rand"
  6. "encoding/base64"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "net/url"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "modernc.org/strutil"
  15. "github.com/seaweedfs/seaweedfs/weed/filer"
  16. "github.com/seaweedfs/seaweedfs/weed/glog"
  17. "github.com/seaweedfs/seaweedfs/weed/operation"
  18. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  19. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  20. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  21. "github.com/seaweedfs/seaweedfs/weed/security"
  22. "github.com/seaweedfs/seaweedfs/weed/util"
  23. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  24. )
  25. const (
  26. DirectiveCopy = "COPY"
  27. DirectiveReplace = "REPLACE"
  28. )
  29. func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
  30. dstBucket, dstObject := s3_constants.GetBucketAndObject(r)
  31. // Copy source path.
  32. cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source"))
  33. if err != nil {
  34. // Save unescaped string as is.
  35. cpSrcPath = r.Header.Get("X-Amz-Copy-Source")
  36. }
  37. srcBucket, srcObject, srcVersionId := pathToBucketObjectAndVersion(cpSrcPath)
  38. glog.V(3).Infof("CopyObjectHandler %s %s (version: %s) => %s %s", srcBucket, srcObject, srcVersionId, dstBucket, dstObject)
  39. // Validate copy source and destination
  40. if err := ValidateCopySource(cpSrcPath, srcBucket, srcObject); err != nil {
  41. glog.V(2).Infof("CopyObjectHandler validation error: %v", err)
  42. errCode := MapCopyValidationError(err)
  43. s3err.WriteErrorResponse(w, r, errCode)
  44. return
  45. }
  46. if err := ValidateCopyDestination(dstBucket, dstObject); err != nil {
  47. glog.V(2).Infof("CopyObjectHandler validation error: %v", err)
  48. errCode := MapCopyValidationError(err)
  49. s3err.WriteErrorResponse(w, r, errCode)
  50. return
  51. }
  52. replaceMeta, replaceTagging := replaceDirective(r.Header)
  53. if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && (replaceMeta || replaceTagging) {
  54. fullPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
  55. dir, name := fullPath.DirAndName()
  56. entry, err := s3a.getEntry(dir, name)
  57. if err != nil || entry.IsDirectory {
  58. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  59. return
  60. }
  61. entry.Extended, err = processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging)
  62. entry.Attributes.Mtime = time.Now().Unix()
  63. if err != nil {
  64. glog.Errorf("CopyObjectHandler ValidateTags error %s: %v", r.URL, err)
  65. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag)
  66. return
  67. }
  68. err = s3a.touch(dir, name, entry)
  69. if err != nil {
  70. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  71. return
  72. }
  73. writeSuccessResponseXML(w, r, CopyObjectResult{
  74. ETag: fmt.Sprintf("%x", entry.Attributes.Md5),
  75. LastModified: time.Now().UTC(),
  76. })
  77. return
  78. }
  79. // If source object is empty or bucket is empty, reply back invalid copy source.
  80. if srcObject == "" || srcBucket == "" {
  81. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  82. return
  83. }
  84. // Get detailed versioning state for source bucket
  85. srcVersioningState, err := s3a.getVersioningState(srcBucket)
  86. if err != nil {
  87. glog.Errorf("Error checking versioning state for source bucket %s: %v", srcBucket, err)
  88. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  89. return
  90. }
  91. // Get the source entry with version awareness based on versioning state
  92. var entry *filer_pb.Entry
  93. if srcVersionId != "" {
  94. // Specific version requested - always use version-aware retrieval
  95. entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId)
  96. } else if srcVersioningState == s3_constants.VersioningEnabled {
  97. // Versioning enabled - get latest version from .versions directory
  98. entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
  99. } else if srcVersioningState == s3_constants.VersioningSuspended {
  100. // Versioning suspended - current object is stored as regular file ("null" version)
  101. // Try regular file first, fall back to latest version if needed
  102. srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
  103. dir, name := srcPath.DirAndName()
  104. entry, err = s3a.getEntry(dir, name)
  105. if err != nil {
  106. // If regular file doesn't exist, try latest version as fallback
  107. glog.V(2).Infof("CopyObject: regular file not found for suspended versioning, trying latest version")
  108. entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
  109. }
  110. } else {
  111. // No versioning configured - use regular retrieval
  112. srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
  113. dir, name := srcPath.DirAndName()
  114. entry, err = s3a.getEntry(dir, name)
  115. }
  116. if err != nil || entry.IsDirectory {
  117. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  118. return
  119. }
  120. if srcBucket == dstBucket && srcObject == dstObject {
  121. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopyDest)
  122. return
  123. }
  124. // Validate conditional copy headers
  125. if err := s3a.validateConditionalCopyHeaders(r, entry); err != s3err.ErrNone {
  126. s3err.WriteErrorResponse(w, r, err)
  127. return
  128. }
  129. // Validate encryption parameters
  130. if err := ValidateCopyEncryption(entry.Extended, r.Header); err != nil {
  131. glog.V(2).Infof("CopyObjectHandler encryption validation error: %v", err)
  132. errCode := MapCopyValidationError(err)
  133. s3err.WriteErrorResponse(w, r, errCode)
  134. return
  135. }
  136. // Create new entry for destination
  137. dstEntry := &filer_pb.Entry{
  138. Attributes: &filer_pb.FuseAttributes{
  139. FileSize: entry.Attributes.FileSize,
  140. Mtime: time.Now().Unix(),
  141. Crtime: entry.Attributes.Crtime,
  142. Mime: entry.Attributes.Mime,
  143. },
  144. Extended: make(map[string][]byte),
  145. }
  146. // Copy extended attributes from source, filtering out conflicting encryption metadata
  147. for k, v := range entry.Extended {
  148. // Skip encryption-specific headers that might conflict with destination encryption type
  149. skipHeader := false
  150. // If we're doing cross-encryption, skip conflicting headers
  151. if len(entry.GetChunks()) > 0 {
  152. // Detect source and destination encryption types
  153. srcHasSSEC := IsSSECEncrypted(entry.Extended)
  154. srcHasSSEKMS := IsSSEKMSEncrypted(entry.Extended)
  155. srcHasSSES3 := IsSSES3EncryptedInternal(entry.Extended)
  156. dstWantsSSEC := IsSSECRequest(r)
  157. dstWantsSSEKMS := IsSSEKMSRequest(r)
  158. dstWantsSSES3 := IsSSES3RequestInternal(r)
  159. // Use helper function to determine if header should be skipped
  160. skipHeader = shouldSkipEncryptionHeader(k,
  161. srcHasSSEC, srcHasSSEKMS, srcHasSSES3,
  162. dstWantsSSEC, dstWantsSSEKMS, dstWantsSSES3)
  163. }
  164. if !skipHeader {
  165. dstEntry.Extended[k] = v
  166. }
  167. }
  168. // Process metadata and tags and apply to destination
  169. processedMetadata, tagErr := processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging)
  170. if tagErr != nil {
  171. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  172. return
  173. }
  174. // Apply processed metadata to destination entry
  175. for k, v := range processedMetadata {
  176. dstEntry.Extended[k] = v
  177. }
  178. // For zero-size files or files without chunks, use the original approach
  179. if entry.Attributes.FileSize == 0 || len(entry.GetChunks()) == 0 {
  180. // Just copy the entry structure without chunks for zero-size files
  181. dstEntry.Chunks = nil
  182. } else {
  183. // Use unified copy strategy approach
  184. dstChunks, dstMetadata, copyErr := s3a.executeUnifiedCopyStrategy(entry, r, dstBucket, srcObject, dstObject)
  185. if copyErr != nil {
  186. glog.Errorf("CopyObjectHandler unified copy error: %v", copyErr)
  187. // Map errors to appropriate S3 errors
  188. errCode := s3a.mapCopyErrorToS3Error(copyErr)
  189. s3err.WriteErrorResponse(w, r, errCode)
  190. return
  191. }
  192. dstEntry.Chunks = dstChunks
  193. // Apply destination-specific metadata (e.g., SSE-C IV and headers)
  194. if dstMetadata != nil {
  195. for k, v := range dstMetadata {
  196. dstEntry.Extended[k] = v
  197. }
  198. glog.V(2).Infof("Applied %d destination metadata entries for copy: %s", len(dstMetadata), r.URL.Path)
  199. }
  200. }
  201. // Check if destination bucket has versioning configured
  202. dstVersioningConfigured, err := s3a.isVersioningConfigured(dstBucket)
  203. if err != nil {
  204. glog.Errorf("Error checking versioning status for destination bucket %s: %v", dstBucket, err)
  205. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  206. return
  207. }
  208. var dstVersionId string
  209. var etag string
  210. if dstVersioningConfigured {
  211. // For versioned destination, create a new version
  212. dstVersionId = generateVersionId()
  213. glog.V(2).Infof("CopyObjectHandler: creating version %s for destination %s/%s", dstVersionId, dstBucket, dstObject)
  214. // Add version metadata to the entry
  215. if dstEntry.Extended == nil {
  216. dstEntry.Extended = make(map[string][]byte)
  217. }
  218. dstEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(dstVersionId)
  219. // Calculate ETag for versioning
  220. filerEntry := &filer.Entry{
  221. FullPath: util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)),
  222. Attr: filer.Attr{
  223. FileSize: dstEntry.Attributes.FileSize,
  224. Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
  225. Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
  226. Mime: dstEntry.Attributes.Mime,
  227. },
  228. Chunks: dstEntry.Chunks,
  229. }
  230. etag = filer.ETagEntry(filerEntry)
  231. if !strings.HasPrefix(etag, "\"") {
  232. etag = "\"" + etag + "\""
  233. }
  234. dstEntry.Extended[s3_constants.ExtETagKey] = []byte(etag)
  235. // Create version file
  236. versionFileName := s3a.getVersionFileName(dstVersionId)
  237. versionObjectPath := dstObject + ".versions/" + versionFileName
  238. bucketDir := s3a.option.BucketsPath + "/" + dstBucket
  239. if err := s3a.mkFile(bucketDir, versionObjectPath, dstEntry.Chunks, func(entry *filer_pb.Entry) {
  240. entry.Attributes = dstEntry.Attributes
  241. entry.Extended = dstEntry.Extended
  242. }); err != nil {
  243. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  244. return
  245. }
  246. // Update the .versions directory metadata
  247. err = s3a.updateLatestVersionInDirectory(dstBucket, dstObject, dstVersionId, versionFileName)
  248. if err != nil {
  249. glog.Errorf("CopyObjectHandler: failed to update latest version in directory: %v", err)
  250. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  251. return
  252. }
  253. // Set version ID in response header
  254. w.Header().Set("x-amz-version-id", dstVersionId)
  255. } else {
  256. // For non-versioned destination, use regular copy
  257. dstPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
  258. dstDir, dstName := dstPath.DirAndName()
  259. // Check if destination exists and remove it first (S3 copy overwrites)
  260. if exists, _ := s3a.exists(dstDir, dstName, false); exists {
  261. if err := s3a.rm(dstDir, dstName, false, false); err != nil {
  262. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  263. return
  264. }
  265. }
  266. // Create the new file
  267. if err := s3a.mkFile(dstDir, dstName, dstEntry.Chunks, func(entry *filer_pb.Entry) {
  268. entry.Attributes = dstEntry.Attributes
  269. entry.Extended = dstEntry.Extended
  270. }); err != nil {
  271. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  272. return
  273. }
  274. // Calculate ETag
  275. filerEntry := &filer.Entry{
  276. FullPath: dstPath,
  277. Attr: filer.Attr{
  278. FileSize: dstEntry.Attributes.FileSize,
  279. Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
  280. Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
  281. Mime: dstEntry.Attributes.Mime,
  282. },
  283. Chunks: dstEntry.Chunks,
  284. }
  285. etag = filer.ETagEntry(filerEntry)
  286. }
  287. setEtag(w, etag)
  288. response := CopyObjectResult{
  289. ETag: etag,
  290. LastModified: time.Now().UTC(),
  291. }
  292. writeSuccessResponseXML(w, r, response)
  293. }
  294. func pathToBucketAndObject(path string) (bucket, object string) {
  295. path = strings.TrimPrefix(path, "/")
  296. parts := strings.SplitN(path, "/", 2)
  297. if len(parts) == 2 {
  298. return parts[0], "/" + parts[1]
  299. }
  300. return parts[0], "/"
  301. }
  302. func pathToBucketObjectAndVersion(path string) (bucket, object, versionId string) {
  303. // Parse versionId from query string if present
  304. // Format: /bucket/object?versionId=version-id
  305. if idx := strings.Index(path, "?versionId="); idx != -1 {
  306. versionId = path[idx+len("?versionId="):] // dynamically calculate length
  307. path = path[:idx]
  308. }
  309. bucket, object = pathToBucketAndObject(path)
  310. return bucket, object, versionId
  311. }
  312. type CopyPartResult struct {
  313. LastModified time.Time `xml:"LastModified"`
  314. ETag string `xml:"ETag"`
  315. }
  316. func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) {
  317. // https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjctsUsingRESTMPUapi.html
  318. // https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
  319. dstBucket, dstObject := s3_constants.GetBucketAndObject(r)
  320. // Copy source path.
  321. cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source"))
  322. if err != nil {
  323. // Save unescaped string as is.
  324. cpSrcPath = r.Header.Get("X-Amz-Copy-Source")
  325. }
  326. srcBucket, srcObject, srcVersionId := pathToBucketObjectAndVersion(cpSrcPath)
  327. // If source object is empty or bucket is empty, reply back invalid copy source.
  328. if srcObject == "" || srcBucket == "" {
  329. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  330. return
  331. }
  332. partIDString := r.URL.Query().Get("partNumber")
  333. uploadID := r.URL.Query().Get("uploadId")
  334. partID, err := strconv.Atoi(partIDString)
  335. if err != nil {
  336. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
  337. return
  338. }
  339. // Check if the upload ID is valid
  340. err = s3a.checkUploadId(dstObject, uploadID)
  341. if err != nil {
  342. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  343. return
  344. }
  345. glog.V(3).Infof("CopyObjectPartHandler %s %s => %s part %d upload %s", srcBucket, srcObject, dstBucket, partID, uploadID)
  346. // check partID with maximum part ID for multipart objects
  347. if partID > s3_constants.MaxS3MultipartParts {
  348. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
  349. return
  350. }
  351. // Get detailed versioning state for source bucket
  352. srcVersioningState, err := s3a.getVersioningState(srcBucket)
  353. if err != nil {
  354. glog.Errorf("Error checking versioning state for source bucket %s: %v", srcBucket, err)
  355. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  356. return
  357. }
  358. // Get the source entry with version awareness based on versioning state
  359. var entry *filer_pb.Entry
  360. if srcVersionId != "" {
  361. // Specific version requested - always use version-aware retrieval
  362. entry, err = s3a.getSpecificObjectVersion(srcBucket, srcObject, srcVersionId)
  363. } else if srcVersioningState == s3_constants.VersioningEnabled {
  364. // Versioning enabled - get latest version from .versions directory
  365. entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
  366. } else if srcVersioningState == s3_constants.VersioningSuspended {
  367. // Versioning suspended - current object is stored as regular file ("null" version)
  368. // Try regular file first, fall back to latest version if needed
  369. srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
  370. dir, name := srcPath.DirAndName()
  371. entry, err = s3a.getEntry(dir, name)
  372. if err != nil {
  373. // If regular file doesn't exist, try latest version as fallback
  374. glog.V(2).Infof("CopyObjectPart: regular file not found for suspended versioning, trying latest version")
  375. entry, err = s3a.getLatestObjectVersion(srcBucket, srcObject)
  376. }
  377. } else {
  378. // No versioning configured - use regular retrieval
  379. srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
  380. dir, name := srcPath.DirAndName()
  381. entry, err = s3a.getEntry(dir, name)
  382. }
  383. if err != nil || entry.IsDirectory {
  384. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  385. return
  386. }
  387. // Validate conditional copy headers
  388. if err := s3a.validateConditionalCopyHeaders(r, entry); err != s3err.ErrNone {
  389. s3err.WriteErrorResponse(w, r, err)
  390. return
  391. }
  392. // Handle range header if present
  393. rangeHeader := r.Header.Get("x-amz-copy-source-range")
  394. var startOffset, endOffset int64
  395. if rangeHeader != "" {
  396. startOffset, endOffset, err = parseRangeHeader(rangeHeader)
  397. if err != nil {
  398. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
  399. return
  400. }
  401. } else {
  402. startOffset = 0
  403. if entry.Attributes.FileSize == 0 {
  404. endOffset = -1 // For zero-size files, use -1 as endOffset
  405. } else {
  406. endOffset = int64(entry.Attributes.FileSize) - 1
  407. }
  408. }
  409. // Create new entry for the part
  410. dstEntry := &filer_pb.Entry{
  411. Attributes: &filer_pb.FuseAttributes{
  412. FileSize: uint64(endOffset - startOffset + 1),
  413. Mtime: time.Now().Unix(),
  414. Crtime: time.Now().Unix(),
  415. Mime: entry.Attributes.Mime,
  416. },
  417. Extended: make(map[string][]byte),
  418. }
  419. // Handle zero-size files or empty ranges
  420. if entry.Attributes.FileSize == 0 || endOffset < startOffset {
  421. // For zero-size files or invalid ranges, create an empty part
  422. dstEntry.Chunks = nil
  423. } else {
  424. // Copy chunks that overlap with the range
  425. dstChunks, err := s3a.copyChunksForRange(entry, startOffset, endOffset, r.URL.Path)
  426. if err != nil {
  427. glog.Errorf("CopyObjectPartHandler copy chunks error: %v", err)
  428. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  429. return
  430. }
  431. dstEntry.Chunks = dstChunks
  432. }
  433. // Save the part entry to the multipart uploads folder
  434. uploadDir := s3a.genUploadsFolder(dstBucket) + "/" + uploadID
  435. partName := fmt.Sprintf("%04d_%s.part", partID, "copy")
  436. // Check if part exists and remove it first (allow re-copying same part)
  437. if exists, _ := s3a.exists(uploadDir, partName, false); exists {
  438. if err := s3a.rm(uploadDir, partName, false, false); err != nil {
  439. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  440. return
  441. }
  442. }
  443. if err := s3a.mkFile(uploadDir, partName, dstEntry.Chunks, func(entry *filer_pb.Entry) {
  444. entry.Attributes = dstEntry.Attributes
  445. entry.Extended = dstEntry.Extended
  446. }); err != nil {
  447. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  448. return
  449. }
  450. // Calculate ETag for the part
  451. partPath := util.FullPath(uploadDir + "/" + partName)
  452. filerEntry := &filer.Entry{
  453. FullPath: partPath,
  454. Attr: filer.Attr{
  455. FileSize: dstEntry.Attributes.FileSize,
  456. Mtime: time.Unix(dstEntry.Attributes.Mtime, 0),
  457. Crtime: time.Unix(dstEntry.Attributes.Crtime, 0),
  458. Mime: dstEntry.Attributes.Mime,
  459. },
  460. Chunks: dstEntry.Chunks,
  461. }
  462. etag := filer.ETagEntry(filerEntry)
  463. setEtag(w, etag)
  464. response := CopyPartResult{
  465. ETag: etag,
  466. LastModified: time.Now().UTC(),
  467. }
  468. writeSuccessResponseXML(w, r, response)
  469. }
  470. func replaceDirective(reqHeader http.Header) (replaceMeta, replaceTagging bool) {
  471. return reqHeader.Get(s3_constants.AmzUserMetaDirective) == DirectiveReplace, reqHeader.Get(s3_constants.AmzObjectTaggingDirective) == DirectiveReplace
  472. }
  473. func processMetadata(reqHeader, existing http.Header, replaceMeta, replaceTagging bool, getTags func(parentDirectoryPath string, entryName string) (tags map[string]string, err error), dir, name string) (err error) {
  474. if sc := reqHeader.Get(s3_constants.AmzStorageClass); len(sc) == 0 {
  475. if sc := existing.Get(s3_constants.AmzStorageClass); len(sc) > 0 {
  476. reqHeader.Set(s3_constants.AmzStorageClass, sc)
  477. }
  478. }
  479. if !replaceMeta {
  480. for header := range reqHeader {
  481. if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) {
  482. delete(reqHeader, header)
  483. }
  484. }
  485. for k, v := range existing {
  486. if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
  487. reqHeader[k] = v
  488. }
  489. }
  490. }
  491. if !replaceTagging {
  492. for header, _ := range reqHeader {
  493. if strings.HasPrefix(header, s3_constants.AmzObjectTagging) {
  494. delete(reqHeader, header)
  495. }
  496. }
  497. found := false
  498. for k, _ := range existing {
  499. if strings.HasPrefix(k, s3_constants.AmzObjectTaggingPrefix) {
  500. found = true
  501. break
  502. }
  503. }
  504. if found {
  505. tags, err := getTags(dir, name)
  506. if err != nil {
  507. return err
  508. }
  509. var tagArr []string
  510. for k, v := range tags {
  511. tagArr = append(tagArr, fmt.Sprintf("%s=%s", k, v))
  512. }
  513. tagStr := strutil.JoinFields(tagArr, "&")
  514. reqHeader.Set(s3_constants.AmzObjectTagging, tagStr)
  515. }
  516. }
  517. return
  518. }
  519. func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, replaceMeta, replaceTagging bool) (metadata map[string][]byte, err error) {
  520. metadata = make(map[string][]byte)
  521. if sc := existing[s3_constants.AmzStorageClass]; len(sc) > 0 {
  522. metadata[s3_constants.AmzStorageClass] = sc
  523. }
  524. if sc := reqHeader.Get(s3_constants.AmzStorageClass); len(sc) > 0 {
  525. metadata[s3_constants.AmzStorageClass] = []byte(sc)
  526. }
  527. // Handle SSE-KMS headers - these are always processed from request headers if present
  528. if sseAlgorithm := reqHeader.Get(s3_constants.AmzServerSideEncryption); sseAlgorithm == "aws:kms" {
  529. metadata[s3_constants.AmzServerSideEncryption] = []byte(sseAlgorithm)
  530. // KMS Key ID (optional - can use default key)
  531. if kmsKeyID := reqHeader.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId); kmsKeyID != "" {
  532. metadata[s3_constants.AmzServerSideEncryptionAwsKmsKeyId] = []byte(kmsKeyID)
  533. }
  534. // Encryption Context (optional)
  535. if encryptionContext := reqHeader.Get(s3_constants.AmzServerSideEncryptionContext); encryptionContext != "" {
  536. metadata[s3_constants.AmzServerSideEncryptionContext] = []byte(encryptionContext)
  537. }
  538. // Bucket Key Enabled (optional)
  539. if bucketKeyEnabled := reqHeader.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyEnabled != "" {
  540. metadata[s3_constants.AmzServerSideEncryptionBucketKeyEnabled] = []byte(bucketKeyEnabled)
  541. }
  542. } else {
  543. // If not explicitly setting SSE-KMS, preserve existing SSE headers from source
  544. for _, sseHeader := range []string{
  545. s3_constants.AmzServerSideEncryption,
  546. s3_constants.AmzServerSideEncryptionAwsKmsKeyId,
  547. s3_constants.AmzServerSideEncryptionContext,
  548. s3_constants.AmzServerSideEncryptionBucketKeyEnabled,
  549. } {
  550. if existingValue, exists := existing[sseHeader]; exists {
  551. metadata[sseHeader] = existingValue
  552. }
  553. }
  554. }
  555. // Handle SSE-C headers - these are always processed from request headers if present
  556. if sseCustomerAlgorithm := reqHeader.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm); sseCustomerAlgorithm != "" {
  557. metadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte(sseCustomerAlgorithm)
  558. if sseCustomerKeyMD5 := reqHeader.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5); sseCustomerKeyMD5 != "" {
  559. metadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(sseCustomerKeyMD5)
  560. }
  561. } else {
  562. // If not explicitly setting SSE-C, preserve existing SSE-C headers from source
  563. for _, ssecHeader := range []string{
  564. s3_constants.AmzServerSideEncryptionCustomerAlgorithm,
  565. s3_constants.AmzServerSideEncryptionCustomerKeyMD5,
  566. } {
  567. if existingValue, exists := existing[ssecHeader]; exists {
  568. metadata[ssecHeader] = existingValue
  569. }
  570. }
  571. }
  572. if replaceMeta {
  573. for header, values := range reqHeader {
  574. if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) {
  575. for _, value := range values {
  576. metadata[header] = []byte(value)
  577. }
  578. }
  579. }
  580. } else {
  581. for k, v := range existing {
  582. if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
  583. metadata[k] = v
  584. }
  585. }
  586. }
  587. if replaceTagging {
  588. if tags := reqHeader.Get(s3_constants.AmzObjectTagging); tags != "" {
  589. parsedTags, err := parseTagsHeader(tags)
  590. if err != nil {
  591. return nil, err
  592. }
  593. err = ValidateTags(parsedTags)
  594. if err != nil {
  595. return nil, err
  596. }
  597. for k, v := range parsedTags {
  598. metadata[s3_constants.AmzObjectTagging+"-"+k] = []byte(v)
  599. }
  600. }
  601. } else {
  602. for k, v := range existing {
  603. if strings.HasPrefix(k, s3_constants.AmzObjectTagging) {
  604. metadata[k] = v
  605. }
  606. }
  607. delete(metadata, s3_constants.AmzTagCount)
  608. }
  609. return
  610. }
  611. // copyChunks replicates chunks from source entry to destination entry
  612. func (s3a *S3ApiServer) copyChunks(entry *filer_pb.Entry, dstPath string) ([]*filer_pb.FileChunk, error) {
  613. dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks()))
  614. const defaultChunkCopyConcurrency = 4
  615. executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency) // Limit to configurable concurrent operations
  616. errChan := make(chan error, len(entry.GetChunks()))
  617. for i, chunk := range entry.GetChunks() {
  618. chunkIndex := i
  619. executor.Execute(func() {
  620. dstChunk, err := s3a.copySingleChunk(chunk, dstPath)
  621. if err != nil {
  622. errChan <- fmt.Errorf("chunk %d: %v", chunkIndex, err)
  623. return
  624. }
  625. dstChunks[chunkIndex] = dstChunk
  626. errChan <- nil
  627. })
  628. }
  629. // Wait for all operations to complete and check for errors
  630. for i := 0; i < len(entry.GetChunks()); i++ {
  631. if err := <-errChan; err != nil {
  632. return nil, err
  633. }
  634. }
  635. return dstChunks, nil
  636. }
  637. // copySingleChunk copies a single chunk from source to destination
  638. func (s3a *S3ApiServer) copySingleChunk(chunk *filer_pb.FileChunk, dstPath string) (*filer_pb.FileChunk, error) {
  639. // Create destination chunk
  640. dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
  641. // Prepare chunk copy (assign new volume and get source URL)
  642. assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath)
  643. if err != nil {
  644. return nil, err
  645. }
  646. // Set file ID on destination chunk
  647. if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
  648. return nil, err
  649. }
  650. // Download and upload the chunk
  651. chunkData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size))
  652. if err != nil {
  653. return nil, fmt.Errorf("download chunk data: %w", err)
  654. }
  655. if err := s3a.uploadChunkData(chunkData, assignResult); err != nil {
  656. return nil, fmt.Errorf("upload chunk data: %w", err)
  657. }
  658. return dstChunk, nil
  659. }
  660. // copySingleChunkForRange copies a portion of a chunk for range operations
  661. func (s3a *S3ApiServer) copySingleChunkForRange(originalChunk, rangeChunk *filer_pb.FileChunk, rangeStart, rangeEnd int64, dstPath string) (*filer_pb.FileChunk, error) {
  662. // Create destination chunk
  663. dstChunk := s3a.createDestinationChunk(rangeChunk, rangeChunk.Offset, rangeChunk.Size)
  664. // Prepare chunk copy (assign new volume and get source URL)
  665. assignResult, srcUrl, err := s3a.prepareChunkCopy(originalChunk.GetFileIdString(), dstPath)
  666. if err != nil {
  667. return nil, err
  668. }
  669. // Set file ID on destination chunk
  670. if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
  671. return nil, err
  672. }
  673. // Calculate the portion of the original chunk that we need to copy
  674. chunkStart := originalChunk.Offset
  675. overlapStart := max(rangeStart, chunkStart)
  676. offsetInChunk := overlapStart - chunkStart
  677. // Download and upload the chunk portion
  678. chunkData, err := s3a.downloadChunkData(srcUrl, offsetInChunk, int64(rangeChunk.Size))
  679. if err != nil {
  680. return nil, fmt.Errorf("download chunk range data: %w", err)
  681. }
  682. if err := s3a.uploadChunkData(chunkData, assignResult); err != nil {
  683. return nil, fmt.Errorf("upload chunk range data: %w", err)
  684. }
  685. return dstChunk, nil
  686. }
  687. // assignNewVolume assigns a new volume for the chunk
  688. func (s3a *S3ApiServer) assignNewVolume(dstPath string) (*filer_pb.AssignVolumeResponse, error) {
  689. var assignResult *filer_pb.AssignVolumeResponse
  690. err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  691. resp, err := client.AssignVolume(context.Background(), &filer_pb.AssignVolumeRequest{
  692. Count: 1,
  693. Replication: "",
  694. Collection: "",
  695. DiskType: "",
  696. DataCenter: s3a.option.DataCenter,
  697. Path: dstPath,
  698. })
  699. if err != nil {
  700. return fmt.Errorf("assign volume: %w", err)
  701. }
  702. if resp.Error != "" {
  703. return fmt.Errorf("assign volume: %v", resp.Error)
  704. }
  705. assignResult = resp
  706. return nil
  707. })
  708. if err != nil {
  709. return nil, err
  710. }
  711. return assignResult, nil
  712. }
  713. // min returns the minimum of two int64 values
  714. func min(a, b int64) int64 {
  715. if a < b {
  716. return a
  717. }
  718. return b
  719. }
  720. // max returns the maximum of two int64 values
  721. func max(a, b int64) int64 {
  722. if a > b {
  723. return a
  724. }
  725. return b
  726. }
  727. // parseRangeHeader parses the x-amz-copy-source-range header
  728. func parseRangeHeader(rangeHeader string) (startOffset, endOffset int64, err error) {
  729. // Remove "bytes=" prefix if present
  730. rangeStr := strings.TrimPrefix(rangeHeader, "bytes=")
  731. parts := strings.Split(rangeStr, "-")
  732. if len(parts) != 2 {
  733. return 0, 0, fmt.Errorf("invalid range format")
  734. }
  735. startOffset, err = strconv.ParseInt(parts[0], 10, 64)
  736. if err != nil {
  737. return 0, 0, fmt.Errorf("invalid start offset: %w", err)
  738. }
  739. endOffset, err = strconv.ParseInt(parts[1], 10, 64)
  740. if err != nil {
  741. return 0, 0, fmt.Errorf("invalid end offset: %w", err)
  742. }
  743. return startOffset, endOffset, nil
  744. }
  745. // copyChunksForRange copies chunks that overlap with the specified range
  746. func (s3a *S3ApiServer) copyChunksForRange(entry *filer_pb.Entry, startOffset, endOffset int64, dstPath string) ([]*filer_pb.FileChunk, error) {
  747. var relevantChunks []*filer_pb.FileChunk
  748. // Find chunks that overlap with the range
  749. for _, chunk := range entry.GetChunks() {
  750. chunkStart := chunk.Offset
  751. chunkEnd := chunk.Offset + int64(chunk.Size)
  752. // Check if chunk overlaps with the range
  753. if chunkStart < endOffset+1 && chunkEnd > startOffset {
  754. // Calculate the overlap
  755. overlapStart := max(startOffset, chunkStart)
  756. overlapEnd := min(endOffset+1, chunkEnd)
  757. // Create a new chunk with adjusted offset and size relative to the range
  758. newChunk := &filer_pb.FileChunk{
  759. FileId: chunk.FileId,
  760. Offset: overlapStart - startOffset, // Offset relative to the range start
  761. Size: uint64(overlapEnd - overlapStart),
  762. ModifiedTsNs: time.Now().UnixNano(),
  763. ETag: chunk.ETag,
  764. IsCompressed: chunk.IsCompressed,
  765. CipherKey: chunk.CipherKey,
  766. Fid: chunk.Fid,
  767. }
  768. relevantChunks = append(relevantChunks, newChunk)
  769. }
  770. }
  771. // Copy the relevant chunks using a specialized method for range copies
  772. dstChunks := make([]*filer_pb.FileChunk, len(relevantChunks))
  773. const defaultChunkCopyConcurrency = 4
  774. executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency)
  775. errChan := make(chan error, len(relevantChunks))
  776. // Create a map to track original chunks for each relevant chunk
  777. originalChunks := make([]*filer_pb.FileChunk, len(relevantChunks))
  778. relevantIndex := 0
  779. for _, chunk := range entry.GetChunks() {
  780. chunkStart := chunk.Offset
  781. chunkEnd := chunk.Offset + int64(chunk.Size)
  782. // Check if chunk overlaps with the range
  783. if chunkStart < endOffset+1 && chunkEnd > startOffset {
  784. originalChunks[relevantIndex] = chunk
  785. relevantIndex++
  786. }
  787. }
  788. for i, chunk := range relevantChunks {
  789. chunkIndex := i
  790. originalChunk := originalChunks[i] // Get the corresponding original chunk
  791. executor.Execute(func() {
  792. dstChunk, err := s3a.copySingleChunkForRange(originalChunk, chunk, startOffset, endOffset, dstPath)
  793. if err != nil {
  794. errChan <- fmt.Errorf("chunk %d: %v", chunkIndex, err)
  795. return
  796. }
  797. dstChunks[chunkIndex] = dstChunk
  798. errChan <- nil
  799. })
  800. }
  801. // Wait for all operations to complete and check for errors
  802. for i := 0; i < len(relevantChunks); i++ {
  803. if err := <-errChan; err != nil {
  804. return nil, err
  805. }
  806. }
  807. return dstChunks, nil
  808. }
  809. // Helper methods for copy operations to avoid code duplication
  810. // validateConditionalCopyHeaders validates the conditional copy headers against the source entry
  811. func (s3a *S3ApiServer) validateConditionalCopyHeaders(r *http.Request, entry *filer_pb.Entry) s3err.ErrorCode {
  812. // Calculate ETag for the source entry
  813. srcPath := util.FullPath(fmt.Sprintf("%s/%s", r.URL.Path, entry.Name))
  814. filerEntry := &filer.Entry{
  815. FullPath: srcPath,
  816. Attr: filer.Attr{
  817. FileSize: entry.Attributes.FileSize,
  818. Mtime: time.Unix(entry.Attributes.Mtime, 0),
  819. Crtime: time.Unix(entry.Attributes.Crtime, 0),
  820. Mime: entry.Attributes.Mime,
  821. },
  822. Chunks: entry.Chunks,
  823. }
  824. sourceETag := filer.ETagEntry(filerEntry)
  825. // Check X-Amz-Copy-Source-If-Match
  826. if ifMatch := r.Header.Get(s3_constants.AmzCopySourceIfMatch); ifMatch != "" {
  827. // Remove quotes if present
  828. ifMatch = strings.Trim(ifMatch, `"`)
  829. sourceETag = strings.Trim(sourceETag, `"`)
  830. glog.V(3).Infof("CopyObjectHandler: If-Match check - expected %s, got %s", ifMatch, sourceETag)
  831. if ifMatch != sourceETag {
  832. glog.V(3).Infof("CopyObjectHandler: If-Match failed - expected %s, got %s", ifMatch, sourceETag)
  833. return s3err.ErrPreconditionFailed
  834. }
  835. }
  836. // Check X-Amz-Copy-Source-If-None-Match
  837. if ifNoneMatch := r.Header.Get(s3_constants.AmzCopySourceIfNoneMatch); ifNoneMatch != "" {
  838. // Remove quotes if present
  839. ifNoneMatch = strings.Trim(ifNoneMatch, `"`)
  840. sourceETag = strings.Trim(sourceETag, `"`)
  841. glog.V(3).Infof("CopyObjectHandler: If-None-Match check - comparing %s with %s", ifNoneMatch, sourceETag)
  842. if ifNoneMatch == sourceETag {
  843. glog.V(3).Infof("CopyObjectHandler: If-None-Match failed - matched %s", sourceETag)
  844. return s3err.ErrPreconditionFailed
  845. }
  846. }
  847. // Check X-Amz-Copy-Source-If-Modified-Since
  848. if ifModifiedSince := r.Header.Get(s3_constants.AmzCopySourceIfModifiedSince); ifModifiedSince != "" {
  849. t, err := time.Parse(time.RFC1123, ifModifiedSince)
  850. if err != nil {
  851. glog.V(3).Infof("CopyObjectHandler: Invalid If-Modified-Since header: %v", err)
  852. return s3err.ErrInvalidRequest
  853. }
  854. if !time.Unix(entry.Attributes.Mtime, 0).After(t) {
  855. glog.V(3).Infof("CopyObjectHandler: If-Modified-Since failed")
  856. return s3err.ErrPreconditionFailed
  857. }
  858. }
  859. // Check X-Amz-Copy-Source-If-Unmodified-Since
  860. if ifUnmodifiedSince := r.Header.Get(s3_constants.AmzCopySourceIfUnmodifiedSince); ifUnmodifiedSince != "" {
  861. t, err := time.Parse(time.RFC1123, ifUnmodifiedSince)
  862. if err != nil {
  863. glog.V(3).Infof("CopyObjectHandler: Invalid If-Unmodified-Since header: %v", err)
  864. return s3err.ErrInvalidRequest
  865. }
  866. if time.Unix(entry.Attributes.Mtime, 0).After(t) {
  867. glog.V(3).Infof("CopyObjectHandler: If-Unmodified-Since failed")
  868. return s3err.ErrPreconditionFailed
  869. }
  870. }
  871. return s3err.ErrNone
  872. }
  873. // createDestinationChunk creates a new chunk based on the source chunk with modified properties
  874. func (s3a *S3ApiServer) createDestinationChunk(sourceChunk *filer_pb.FileChunk, offset int64, size uint64) *filer_pb.FileChunk {
  875. return &filer_pb.FileChunk{
  876. Offset: offset,
  877. Size: size,
  878. ModifiedTsNs: time.Now().UnixNano(),
  879. ETag: sourceChunk.ETag,
  880. IsCompressed: sourceChunk.IsCompressed,
  881. CipherKey: sourceChunk.CipherKey,
  882. }
  883. }
  884. // lookupVolumeUrl looks up the volume URL for a given file ID using the filer's LookupVolume method
  885. func (s3a *S3ApiServer) lookupVolumeUrl(fileId string) (string, error) {
  886. var srcUrl string
  887. err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  888. vid, _, err := operation.ParseFileId(fileId)
  889. if err != nil {
  890. return fmt.Errorf("parse file ID: %w", err)
  891. }
  892. resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
  893. VolumeIds: []string{vid},
  894. })
  895. if err != nil {
  896. return fmt.Errorf("lookup volume: %w", err)
  897. }
  898. if locations, found := resp.LocationsMap[vid]; found && len(locations.Locations) > 0 {
  899. srcUrl = "http://" + locations.Locations[0].Url + "/" + fileId
  900. } else {
  901. return fmt.Errorf("no location found for volume %s", vid)
  902. }
  903. return nil
  904. })
  905. if err != nil {
  906. return "", fmt.Errorf("lookup volume URL: %w", err)
  907. }
  908. return srcUrl, nil
  909. }
  910. // setChunkFileId sets the file ID on the destination chunk
  911. func (s3a *S3ApiServer) setChunkFileId(chunk *filer_pb.FileChunk, assignResult *filer_pb.AssignVolumeResponse) error {
  912. chunk.FileId = assignResult.FileId
  913. fid, err := filer_pb.ToFileIdObject(assignResult.FileId)
  914. if err != nil {
  915. return fmt.Errorf("parse file ID: %w", err)
  916. }
  917. chunk.Fid = fid
  918. return nil
  919. }
  920. // prepareChunkCopy prepares a chunk for copying by assigning a new volume and looking up the source URL
  921. func (s3a *S3ApiServer) prepareChunkCopy(sourceFileId, dstPath string) (*filer_pb.AssignVolumeResponse, string, error) {
  922. // Assign new volume
  923. assignResult, err := s3a.assignNewVolume(dstPath)
  924. if err != nil {
  925. return nil, "", fmt.Errorf("assign volume: %w", err)
  926. }
  927. // Look up source URL
  928. srcUrl, err := s3a.lookupVolumeUrl(sourceFileId)
  929. if err != nil {
  930. return nil, "", fmt.Errorf("lookup source URL: %w", err)
  931. }
  932. return assignResult, srcUrl, nil
  933. }
  934. // uploadChunkData uploads chunk data to the destination using common upload logic
  935. func (s3a *S3ApiServer) uploadChunkData(chunkData []byte, assignResult *filer_pb.AssignVolumeResponse) error {
  936. dstUrl := fmt.Sprintf("http://%s/%s", assignResult.Location.Url, assignResult.FileId)
  937. uploadOption := &operation.UploadOption{
  938. UploadUrl: dstUrl,
  939. Cipher: false,
  940. IsInputCompressed: false,
  941. MimeType: "",
  942. PairMap: nil,
  943. Jwt: security.EncodedJwt(assignResult.Auth),
  944. }
  945. uploader, err := operation.NewUploader()
  946. if err != nil {
  947. return fmt.Errorf("create uploader: %w", err)
  948. }
  949. _, err = uploader.UploadData(context.Background(), chunkData, uploadOption)
  950. if err != nil {
  951. return fmt.Errorf("upload chunk: %w", err)
  952. }
  953. return nil
  954. }
  955. // downloadChunkData downloads chunk data from the source URL
  956. func (s3a *S3ApiServer) downloadChunkData(srcUrl string, offset, size int64) ([]byte, error) {
  957. var chunkData []byte
  958. shouldRetry, err := util_http.ReadUrlAsStream(context.Background(), srcUrl, nil, false, false, offset, int(size), func(data []byte) {
  959. chunkData = append(chunkData, data...)
  960. })
  961. if err != nil {
  962. return nil, fmt.Errorf("download chunk: %w", err)
  963. }
  964. if shouldRetry {
  965. return nil, fmt.Errorf("download chunk: retry needed")
  966. }
  967. return chunkData, nil
  968. }
  969. // copyMultipartSSECChunks handles copying multipart SSE-C objects
  970. // Returns chunks and destination metadata that should be applied to the destination entry
  971. func (s3a *S3ApiServer) copyMultipartSSECChunks(entry *filer_pb.Entry, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
  972. glog.Infof("copyMultipartSSECChunks called: copySourceKey=%v, destKey=%v, path=%s", copySourceKey != nil, destKey != nil, dstPath)
  973. var sourceKeyMD5, destKeyMD5 string
  974. if copySourceKey != nil {
  975. sourceKeyMD5 = copySourceKey.KeyMD5
  976. }
  977. if destKey != nil {
  978. destKeyMD5 = destKey.KeyMD5
  979. }
  980. glog.Infof("Key MD5 comparison: source=%s, dest=%s, equal=%t", sourceKeyMD5, destKeyMD5, sourceKeyMD5 == destKeyMD5)
  981. // For multipart SSE-C, always use decrypt/reencrypt path to ensure proper metadata handling
  982. // The standard copyChunks() doesn't preserve SSE metadata, so we need per-chunk processing
  983. glog.Infof("Taking multipart SSE-C reencrypt path to preserve metadata: %s", dstPath)
  984. // Different keys or key changes: decrypt and re-encrypt each chunk individually
  985. glog.V(2).Infof("Multipart SSE-C reencrypt copy (different keys): %s", dstPath)
  986. var dstChunks []*filer_pb.FileChunk
  987. var destIV []byte
  988. for _, chunk := range entry.GetChunks() {
  989. if chunk.GetSseType() != filer_pb.SSEType_SSE_C {
  990. // Non-SSE-C chunk, copy directly
  991. copiedChunk, err := s3a.copySingleChunk(chunk, dstPath)
  992. if err != nil {
  993. return nil, nil, fmt.Errorf("failed to copy non-SSE-C chunk: %w", err)
  994. }
  995. dstChunks = append(dstChunks, copiedChunk)
  996. continue
  997. }
  998. // SSE-C chunk: decrypt with stored per-chunk metadata, re-encrypt with dest key
  999. copiedChunk, chunkDestIV, err := s3a.copyMultipartSSECChunk(chunk, copySourceKey, destKey, dstPath)
  1000. if err != nil {
  1001. return nil, nil, fmt.Errorf("failed to copy SSE-C chunk %s: %w", chunk.GetFileIdString(), err)
  1002. }
  1003. dstChunks = append(dstChunks, copiedChunk)
  1004. // Store the first chunk's IV as the object's IV (for single-part compatibility)
  1005. if len(destIV) == 0 {
  1006. destIV = chunkDestIV
  1007. }
  1008. }
  1009. // Create destination metadata
  1010. dstMetadata := make(map[string][]byte)
  1011. if destKey != nil && len(destIV) > 0 {
  1012. // Store the IV and SSE-C headers for single-part compatibility
  1013. StoreIVInMetadata(dstMetadata, destIV)
  1014. dstMetadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256")
  1015. dstMetadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(destKey.KeyMD5)
  1016. glog.V(2).Infof("Prepared multipart SSE-C destination metadata: %s", dstPath)
  1017. }
  1018. return dstChunks, dstMetadata, nil
  1019. }
  1020. // copyMultipartSSEKMSChunks handles copying multipart SSE-KMS objects (unified with SSE-C approach)
  1021. // Returns chunks and destination metadata that should be applied to the destination entry
  1022. func (s3a *S3ApiServer) copyMultipartSSEKMSChunks(entry *filer_pb.Entry, destKeyID string, encryptionContext map[string]string, bucketKeyEnabled bool, dstPath, bucket string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
  1023. glog.Infof("copyMultipartSSEKMSChunks called: destKeyID=%s, path=%s", destKeyID, dstPath)
  1024. // For multipart SSE-KMS, always use decrypt/reencrypt path to ensure proper metadata handling
  1025. // The standard copyChunks() doesn't preserve SSE metadata, so we need per-chunk processing
  1026. glog.Infof("Taking multipart SSE-KMS reencrypt path to preserve metadata: %s", dstPath)
  1027. var dstChunks []*filer_pb.FileChunk
  1028. for _, chunk := range entry.GetChunks() {
  1029. if chunk.GetSseType() != filer_pb.SSEType_SSE_KMS {
  1030. // Non-SSE-KMS chunk, copy directly
  1031. copiedChunk, err := s3a.copySingleChunk(chunk, dstPath)
  1032. if err != nil {
  1033. return nil, nil, fmt.Errorf("failed to copy non-SSE-KMS chunk: %w", err)
  1034. }
  1035. dstChunks = append(dstChunks, copiedChunk)
  1036. continue
  1037. }
  1038. // SSE-KMS chunk: decrypt with stored per-chunk metadata, re-encrypt with dest key
  1039. copiedChunk, err := s3a.copyMultipartSSEKMSChunk(chunk, destKeyID, encryptionContext, bucketKeyEnabled, dstPath, bucket)
  1040. if err != nil {
  1041. return nil, nil, fmt.Errorf("failed to copy SSE-KMS chunk %s: %w", chunk.GetFileIdString(), err)
  1042. }
  1043. dstChunks = append(dstChunks, copiedChunk)
  1044. }
  1045. // Create destination metadata for SSE-KMS
  1046. dstMetadata := make(map[string][]byte)
  1047. if destKeyID != "" {
  1048. // Store SSE-KMS metadata for single-part compatibility
  1049. if encryptionContext == nil {
  1050. encryptionContext = BuildEncryptionContext(bucket, dstPath, bucketKeyEnabled)
  1051. }
  1052. sseKey := &SSEKMSKey{
  1053. KeyID: destKeyID,
  1054. EncryptionContext: encryptionContext,
  1055. BucketKeyEnabled: bucketKeyEnabled,
  1056. }
  1057. if kmsMetadata, serErr := SerializeSSEKMSMetadata(sseKey); serErr == nil {
  1058. dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata
  1059. glog.Infof("Created object-level KMS metadata for GET compatibility")
  1060. } else {
  1061. glog.Errorf("Failed to serialize SSE-KMS metadata: %v", serErr)
  1062. }
  1063. }
  1064. return dstChunks, dstMetadata, nil
  1065. }
  1066. // copyMultipartSSEKMSChunk copies a single SSE-KMS chunk from a multipart object (unified with SSE-C approach)
  1067. func (s3a *S3ApiServer) copyMultipartSSEKMSChunk(chunk *filer_pb.FileChunk, destKeyID string, encryptionContext map[string]string, bucketKeyEnabled bool, dstPath, bucket string) (*filer_pb.FileChunk, error) {
  1068. // Create destination chunk
  1069. dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
  1070. // Prepare chunk copy (assign new volume and get source URL)
  1071. assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath)
  1072. if err != nil {
  1073. return nil, err
  1074. }
  1075. // Set file ID on destination chunk
  1076. if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
  1077. return nil, err
  1078. }
  1079. // Download encrypted chunk data
  1080. encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size))
  1081. if err != nil {
  1082. return nil, fmt.Errorf("download encrypted chunk data: %w", err)
  1083. }
  1084. var finalData []byte
  1085. // Decrypt source data using stored SSE-KMS metadata (same pattern as SSE-C)
  1086. if len(chunk.GetSseMetadata()) == 0 {
  1087. return nil, fmt.Errorf("SSE-KMS chunk missing per-chunk metadata")
  1088. }
  1089. // Deserialize the SSE-KMS metadata (reusing unified metadata structure)
  1090. sourceSSEKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata())
  1091. if err != nil {
  1092. return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err)
  1093. }
  1094. // Decrypt the chunk data using the source metadata
  1095. decryptedReader, decErr := CreateSSEKMSDecryptedReader(bytes.NewReader(encryptedData), sourceSSEKey)
  1096. if decErr != nil {
  1097. return nil, fmt.Errorf("create SSE-KMS decrypted reader: %w", decErr)
  1098. }
  1099. decryptedData, readErr := io.ReadAll(decryptedReader)
  1100. if readErr != nil {
  1101. return nil, fmt.Errorf("decrypt chunk data: %w", readErr)
  1102. }
  1103. finalData = decryptedData
  1104. glog.V(4).Infof("Decrypted multipart SSE-KMS chunk: %d bytes → %d bytes", len(encryptedData), len(finalData))
  1105. // Re-encrypt with destination key if specified
  1106. if destKeyID != "" {
  1107. // Build encryption context if not provided
  1108. if encryptionContext == nil {
  1109. encryptionContext = BuildEncryptionContext(bucket, dstPath, bucketKeyEnabled)
  1110. }
  1111. // Encrypt with destination key
  1112. encryptedReader, destSSEKey, encErr := CreateSSEKMSEncryptedReaderWithBucketKey(bytes.NewReader(finalData), destKeyID, encryptionContext, bucketKeyEnabled)
  1113. if encErr != nil {
  1114. return nil, fmt.Errorf("create SSE-KMS encrypted reader: %w", encErr)
  1115. }
  1116. reencryptedData, readErr := io.ReadAll(encryptedReader)
  1117. if readErr != nil {
  1118. return nil, fmt.Errorf("re-encrypt chunk data: %w", readErr)
  1119. }
  1120. finalData = reencryptedData
  1121. // Create per-chunk SSE-KMS metadata for the destination chunk
  1122. // For copy operations, reset chunk offset to 0 (similar to SSE-C approach)
  1123. // The copied chunks form a new object structure independent of original part boundaries
  1124. destSSEKey.ChunkOffset = 0
  1125. kmsMetadata, err := SerializeSSEKMSMetadata(destSSEKey)
  1126. if err != nil {
  1127. return nil, fmt.Errorf("serialize SSE-KMS metadata: %w", err)
  1128. }
  1129. // Set the SSE type and metadata on destination chunk (unified approach)
  1130. dstChunk.SseType = filer_pb.SSEType_SSE_KMS
  1131. dstChunk.SseMetadata = kmsMetadata
  1132. glog.V(4).Infof("Re-encrypted multipart SSE-KMS chunk: %d bytes → %d bytes", len(finalData)-len(reencryptedData)+len(finalData), len(finalData))
  1133. }
  1134. // Upload the final data
  1135. if err := s3a.uploadChunkData(finalData, assignResult); err != nil {
  1136. return nil, fmt.Errorf("upload chunk data: %w", err)
  1137. }
  1138. // Update chunk size
  1139. dstChunk.Size = uint64(len(finalData))
  1140. glog.V(3).Infof("Successfully copied multipart SSE-KMS chunk %s → %s",
  1141. chunk.GetFileIdString(), dstChunk.GetFileIdString())
  1142. return dstChunk, nil
  1143. }
  1144. // copyMultipartSSECChunk copies a single SSE-C chunk from a multipart object
  1145. func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) (*filer_pb.FileChunk, []byte, error) {
  1146. // Create destination chunk
  1147. dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
  1148. // Prepare chunk copy (assign new volume and get source URL)
  1149. assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath)
  1150. if err != nil {
  1151. return nil, nil, err
  1152. }
  1153. // Set file ID on destination chunk
  1154. if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
  1155. return nil, nil, err
  1156. }
  1157. // Download encrypted chunk data
  1158. encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size))
  1159. if err != nil {
  1160. return nil, nil, fmt.Errorf("download encrypted chunk data: %w", err)
  1161. }
  1162. var finalData []byte
  1163. var destIV []byte
  1164. // Decrypt if source is encrypted
  1165. if copySourceKey != nil {
  1166. // Get the per-chunk SSE-C metadata
  1167. if len(chunk.GetSseMetadata()) == 0 {
  1168. return nil, nil, fmt.Errorf("SSE-C chunk missing per-chunk metadata")
  1169. }
  1170. // Deserialize the SSE-C metadata
  1171. ssecMetadata, err := DeserializeSSECMetadata(chunk.GetSseMetadata())
  1172. if err != nil {
  1173. return nil, nil, fmt.Errorf("failed to deserialize SSE-C metadata: %w", err)
  1174. }
  1175. // Decode the IV from the metadata
  1176. chunkBaseIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV)
  1177. if err != nil {
  1178. return nil, nil, fmt.Errorf("failed to decode chunk IV: %w", err)
  1179. }
  1180. // Calculate the correct IV for this chunk using within-part offset
  1181. var chunkIV []byte
  1182. if ssecMetadata.PartOffset > 0 {
  1183. chunkIV = calculateIVWithOffset(chunkBaseIV, ssecMetadata.PartOffset)
  1184. } else {
  1185. chunkIV = chunkBaseIV
  1186. }
  1187. // Decrypt the chunk data
  1188. decryptedReader, decErr := CreateSSECDecryptedReader(bytes.NewReader(encryptedData), copySourceKey, chunkIV)
  1189. if decErr != nil {
  1190. return nil, nil, fmt.Errorf("create decrypted reader: %w", decErr)
  1191. }
  1192. decryptedData, readErr := io.ReadAll(decryptedReader)
  1193. if readErr != nil {
  1194. return nil, nil, fmt.Errorf("decrypt chunk data: %w", readErr)
  1195. }
  1196. finalData = decryptedData
  1197. glog.V(4).Infof("Decrypted multipart SSE-C chunk: %d bytes → %d bytes", len(encryptedData), len(finalData))
  1198. } else {
  1199. // Source is unencrypted
  1200. finalData = encryptedData
  1201. }
  1202. // Re-encrypt if destination should be encrypted
  1203. if destKey != nil {
  1204. // Generate new IV for this chunk
  1205. newIV := make([]byte, s3_constants.AESBlockSize)
  1206. if _, err := rand.Read(newIV); err != nil {
  1207. return nil, nil, fmt.Errorf("generate IV: %w", err)
  1208. }
  1209. destIV = newIV
  1210. // Encrypt with new key and IV
  1211. encryptedReader, iv, encErr := CreateSSECEncryptedReader(bytes.NewReader(finalData), destKey)
  1212. if encErr != nil {
  1213. return nil, nil, fmt.Errorf("create encrypted reader: %w", encErr)
  1214. }
  1215. destIV = iv
  1216. reencryptedData, readErr := io.ReadAll(encryptedReader)
  1217. if readErr != nil {
  1218. return nil, nil, fmt.Errorf("re-encrypt chunk data: %w", readErr)
  1219. }
  1220. finalData = reencryptedData
  1221. // Create per-chunk SSE-C metadata for the destination chunk
  1222. ssecMetadata, err := SerializeSSECMetadata(destIV, destKey.KeyMD5, 0) // partOffset=0 for copied chunks
  1223. if err != nil {
  1224. return nil, nil, fmt.Errorf("serialize SSE-C metadata: %w", err)
  1225. }
  1226. // Set the SSE type and metadata on destination chunk
  1227. dstChunk.SseType = filer_pb.SSEType_SSE_C
  1228. dstChunk.SseMetadata = ssecMetadata // Use unified metadata field
  1229. glog.V(4).Infof("Re-encrypted multipart SSE-C chunk: %d bytes → %d bytes", len(finalData)-len(reencryptedData)+len(finalData), len(finalData))
  1230. }
  1231. // Upload the final data
  1232. if err := s3a.uploadChunkData(finalData, assignResult); err != nil {
  1233. return nil, nil, fmt.Errorf("upload chunk data: %w", err)
  1234. }
  1235. // Update chunk size
  1236. dstChunk.Size = uint64(len(finalData))
  1237. glog.V(3).Infof("Successfully copied multipart SSE-C chunk %s → %s",
  1238. chunk.GetFileIdString(), dstChunk.GetFileIdString())
  1239. return dstChunk, destIV, nil
  1240. }
  1241. // copyMultipartCrossEncryption handles all cross-encryption and decrypt-only copy scenarios
  1242. // This unified function supports: SSE-C↔SSE-KMS, SSE-C→Plain, SSE-KMS→Plain
  1243. func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
  1244. glog.Infof("copyMultipartCrossEncryption called: %s→%s, path=%s",
  1245. s3a.getEncryptionTypeString(state.SrcSSEC, state.SrcSSEKMS, false),
  1246. s3a.getEncryptionTypeString(state.DstSSEC, state.DstSSEKMS, false), dstPath)
  1247. var dstChunks []*filer_pb.FileChunk
  1248. // Parse destination encryption parameters
  1249. var destSSECKey *SSECustomerKey
  1250. var destKMSKeyID string
  1251. var destKMSEncryptionContext map[string]string
  1252. var destKMSBucketKeyEnabled bool
  1253. if state.DstSSEC {
  1254. var err error
  1255. destSSECKey, err = ParseSSECHeaders(r)
  1256. if err != nil {
  1257. return nil, nil, fmt.Errorf("failed to parse destination SSE-C headers: %w", err)
  1258. }
  1259. glog.Infof("Destination SSE-C: keyMD5=%s", destSSECKey.KeyMD5)
  1260. } else if state.DstSSEKMS {
  1261. var err error
  1262. destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, err = ParseSSEKMSCopyHeaders(r)
  1263. if err != nil {
  1264. return nil, nil, fmt.Errorf("failed to parse destination SSE-KMS headers: %w", err)
  1265. }
  1266. glog.Infof("Destination SSE-KMS: keyID=%s, bucketKey=%t", destKMSKeyID, destKMSBucketKeyEnabled)
  1267. } else {
  1268. glog.Infof("Destination: Unencrypted")
  1269. }
  1270. // Parse source encryption parameters
  1271. var sourceSSECKey *SSECustomerKey
  1272. if state.SrcSSEC {
  1273. var err error
  1274. sourceSSECKey, err = ParseSSECCopySourceHeaders(r)
  1275. if err != nil {
  1276. return nil, nil, fmt.Errorf("failed to parse source SSE-C headers: %w", err)
  1277. }
  1278. glog.Infof("Source SSE-C: keyMD5=%s", sourceSSECKey.KeyMD5)
  1279. }
  1280. // Process each chunk with unified cross-encryption logic
  1281. for _, chunk := range entry.GetChunks() {
  1282. var copiedChunk *filer_pb.FileChunk
  1283. var err error
  1284. if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
  1285. copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state)
  1286. } else if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS {
  1287. copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state)
  1288. } else {
  1289. // Unencrypted chunk, copy directly
  1290. copiedChunk, err = s3a.copySingleChunk(chunk, dstPath)
  1291. }
  1292. if err != nil {
  1293. return nil, nil, fmt.Errorf("failed to copy chunk %s: %w", chunk.GetFileIdString(), err)
  1294. }
  1295. dstChunks = append(dstChunks, copiedChunk)
  1296. }
  1297. // Create destination metadata based on destination encryption type
  1298. dstMetadata := make(map[string][]byte)
  1299. // Clear any previous encryption metadata to avoid routing conflicts
  1300. if state.SrcSSEKMS && state.DstSSEC {
  1301. // SSE-KMS → SSE-C: Remove SSE-KMS headers
  1302. // These will be excluded from dstMetadata, effectively removing them
  1303. } else if state.SrcSSEC && state.DstSSEKMS {
  1304. // SSE-C → SSE-KMS: Remove SSE-C headers
  1305. // These will be excluded from dstMetadata, effectively removing them
  1306. } else if !state.DstSSEC && !state.DstSSEKMS {
  1307. // Encrypted → Unencrypted: Remove all encryption metadata
  1308. // These will be excluded from dstMetadata, effectively removing them
  1309. }
  1310. if state.DstSSEC && destSSECKey != nil {
  1311. // For SSE-C destination, use first chunk's IV for compatibility
  1312. if len(dstChunks) > 0 && dstChunks[0].GetSseType() == filer_pb.SSEType_SSE_C && len(dstChunks[0].GetSseMetadata()) > 0 {
  1313. if ssecMetadata, err := DeserializeSSECMetadata(dstChunks[0].GetSseMetadata()); err == nil {
  1314. if iv, ivErr := base64.StdEncoding.DecodeString(ssecMetadata.IV); ivErr == nil {
  1315. StoreIVInMetadata(dstMetadata, iv)
  1316. dstMetadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256")
  1317. dstMetadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(destSSECKey.KeyMD5)
  1318. glog.Infof("Created SSE-C object-level metadata from first chunk")
  1319. }
  1320. }
  1321. }
  1322. } else if state.DstSSEKMS && destKMSKeyID != "" {
  1323. // For SSE-KMS destination, create object-level metadata
  1324. if destKMSEncryptionContext == nil {
  1325. destKMSEncryptionContext = BuildEncryptionContext(dstBucket, dstPath, destKMSBucketKeyEnabled)
  1326. }
  1327. sseKey := &SSEKMSKey{
  1328. KeyID: destKMSKeyID,
  1329. EncryptionContext: destKMSEncryptionContext,
  1330. BucketKeyEnabled: destKMSBucketKeyEnabled,
  1331. }
  1332. if kmsMetadata, serErr := SerializeSSEKMSMetadata(sseKey); serErr == nil {
  1333. dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata
  1334. glog.Infof("Created SSE-KMS object-level metadata")
  1335. } else {
  1336. glog.Errorf("Failed to serialize SSE-KMS metadata: %v", serErr)
  1337. }
  1338. }
  1339. // For unencrypted destination, no metadata needed (dstMetadata remains empty)
  1340. return dstChunks, dstMetadata, nil
  1341. }
  1342. // copyCrossEncryptionChunk handles copying a single chunk with cross-encryption support
  1343. func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) {
  1344. // Create destination chunk
  1345. dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
  1346. // Prepare chunk copy (assign new volume and get source URL)
  1347. assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath)
  1348. if err != nil {
  1349. return nil, err
  1350. }
  1351. // Set file ID on destination chunk
  1352. if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
  1353. return nil, err
  1354. }
  1355. // Download encrypted chunk data
  1356. encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size))
  1357. if err != nil {
  1358. return nil, fmt.Errorf("download encrypted chunk data: %w", err)
  1359. }
  1360. var finalData []byte
  1361. // Step 1: Decrypt source data
  1362. if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
  1363. // Decrypt SSE-C source
  1364. if len(chunk.GetSseMetadata()) == 0 {
  1365. return nil, fmt.Errorf("SSE-C chunk missing per-chunk metadata")
  1366. }
  1367. ssecMetadata, err := DeserializeSSECMetadata(chunk.GetSseMetadata())
  1368. if err != nil {
  1369. return nil, fmt.Errorf("failed to deserialize SSE-C metadata: %w", err)
  1370. }
  1371. chunkBaseIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV)
  1372. if err != nil {
  1373. return nil, fmt.Errorf("failed to decode chunk IV: %w", err)
  1374. }
  1375. // Calculate the correct IV for this chunk using within-part offset
  1376. var chunkIV []byte
  1377. if ssecMetadata.PartOffset > 0 {
  1378. chunkIV = calculateIVWithOffset(chunkBaseIV, ssecMetadata.PartOffset)
  1379. } else {
  1380. chunkIV = chunkBaseIV
  1381. }
  1382. decryptedReader, decErr := CreateSSECDecryptedReader(bytes.NewReader(encryptedData), sourceSSECKey, chunkIV)
  1383. if decErr != nil {
  1384. return nil, fmt.Errorf("create SSE-C decrypted reader: %w", decErr)
  1385. }
  1386. decryptedData, readErr := io.ReadAll(decryptedReader)
  1387. if readErr != nil {
  1388. return nil, fmt.Errorf("decrypt SSE-C chunk data: %w", readErr)
  1389. }
  1390. finalData = decryptedData
  1391. previewLen := 16
  1392. if len(finalData) < previewLen {
  1393. previewLen = len(finalData)
  1394. }
  1395. } else if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS {
  1396. // Decrypt SSE-KMS source
  1397. if len(chunk.GetSseMetadata()) == 0 {
  1398. return nil, fmt.Errorf("SSE-KMS chunk missing per-chunk metadata")
  1399. }
  1400. sourceSSEKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata())
  1401. if err != nil {
  1402. return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err)
  1403. }
  1404. decryptedReader, decErr := CreateSSEKMSDecryptedReader(bytes.NewReader(encryptedData), sourceSSEKey)
  1405. if decErr != nil {
  1406. return nil, fmt.Errorf("create SSE-KMS decrypted reader: %w", decErr)
  1407. }
  1408. decryptedData, readErr := io.ReadAll(decryptedReader)
  1409. if readErr != nil {
  1410. return nil, fmt.Errorf("decrypt SSE-KMS chunk data: %w", readErr)
  1411. }
  1412. finalData = decryptedData
  1413. previewLen := 16
  1414. if len(finalData) < previewLen {
  1415. previewLen = len(finalData)
  1416. }
  1417. } else {
  1418. // Source is unencrypted
  1419. finalData = encryptedData
  1420. }
  1421. // Step 2: Re-encrypt with destination encryption (if any)
  1422. if state.DstSSEC && destSSECKey != nil {
  1423. // Encrypt with SSE-C
  1424. encryptedReader, iv, encErr := CreateSSECEncryptedReader(bytes.NewReader(finalData), destSSECKey)
  1425. if encErr != nil {
  1426. return nil, fmt.Errorf("create SSE-C encrypted reader: %w", encErr)
  1427. }
  1428. reencryptedData, readErr := io.ReadAll(encryptedReader)
  1429. if readErr != nil {
  1430. return nil, fmt.Errorf("re-encrypt with SSE-C: %w", readErr)
  1431. }
  1432. finalData = reencryptedData
  1433. // Create per-chunk SSE-C metadata (offset=0 for cross-encryption copies)
  1434. ssecMetadata, err := SerializeSSECMetadata(iv, destSSECKey.KeyMD5, 0)
  1435. if err != nil {
  1436. return nil, fmt.Errorf("serialize SSE-C metadata: %w", err)
  1437. }
  1438. dstChunk.SseType = filer_pb.SSEType_SSE_C
  1439. dstChunk.SseMetadata = ssecMetadata
  1440. previewLen := 16
  1441. if len(finalData) < previewLen {
  1442. previewLen = len(finalData)
  1443. }
  1444. } else if state.DstSSEKMS && destKMSKeyID != "" {
  1445. // Encrypt with SSE-KMS
  1446. if destKMSEncryptionContext == nil {
  1447. destKMSEncryptionContext = BuildEncryptionContext(dstBucket, dstPath, destKMSBucketKeyEnabled)
  1448. }
  1449. encryptedReader, destSSEKey, encErr := CreateSSEKMSEncryptedReaderWithBucketKey(bytes.NewReader(finalData), destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled)
  1450. if encErr != nil {
  1451. return nil, fmt.Errorf("create SSE-KMS encrypted reader: %w", encErr)
  1452. }
  1453. reencryptedData, readErr := io.ReadAll(encryptedReader)
  1454. if readErr != nil {
  1455. return nil, fmt.Errorf("re-encrypt with SSE-KMS: %w", readErr)
  1456. }
  1457. finalData = reencryptedData
  1458. // Create per-chunk SSE-KMS metadata (offset=0 for cross-encryption copies)
  1459. destSSEKey.ChunkOffset = 0
  1460. kmsMetadata, err := SerializeSSEKMSMetadata(destSSEKey)
  1461. if err != nil {
  1462. return nil, fmt.Errorf("serialize SSE-KMS metadata: %w", err)
  1463. }
  1464. dstChunk.SseType = filer_pb.SSEType_SSE_KMS
  1465. dstChunk.SseMetadata = kmsMetadata
  1466. glog.V(4).Infof("Re-encrypted chunk with SSE-KMS")
  1467. }
  1468. // For unencrypted destination, finalData remains as decrypted plaintext
  1469. // Upload the final data
  1470. if err := s3a.uploadChunkData(finalData, assignResult); err != nil {
  1471. return nil, fmt.Errorf("upload chunk data: %w", err)
  1472. }
  1473. // Update chunk size
  1474. dstChunk.Size = uint64(len(finalData))
  1475. glog.V(3).Infof("Successfully copied cross-encryption chunk %s → %s",
  1476. chunk.GetFileIdString(), dstChunk.GetFileIdString())
  1477. return dstChunk, nil
  1478. }
  1479. // getEncryptionTypeString returns a string representation of encryption type for logging
  1480. func (s3a *S3ApiServer) getEncryptionTypeString(isSSEC, isSSEKMS, isSSES3 bool) string {
  1481. if isSSEC {
  1482. return s3_constants.SSETypeC
  1483. } else if isSSEKMS {
  1484. return s3_constants.SSETypeKMS
  1485. } else if isSSES3 {
  1486. return s3_constants.SSETypeS3
  1487. }
  1488. return "Plain"
  1489. }
  1490. // copyChunksWithSSEC handles SSE-C aware copying with smart fast/slow path selection
  1491. // Returns chunks and destination metadata that should be applied to the destination entry
  1492. func (s3a *S3ApiServer) copyChunksWithSSEC(entry *filer_pb.Entry, r *http.Request) ([]*filer_pb.FileChunk, map[string][]byte, error) {
  1493. glog.Infof("copyChunksWithSSEC called for %s with %d chunks", r.URL.Path, len(entry.GetChunks()))
  1494. // Parse SSE-C headers
  1495. copySourceKey, err := ParseSSECCopySourceHeaders(r)
  1496. if err != nil {
  1497. glog.Errorf("Failed to parse SSE-C copy source headers: %v", err)
  1498. return nil, nil, err
  1499. }
  1500. destKey, err := ParseSSECHeaders(r)
  1501. if err != nil {
  1502. glog.Errorf("Failed to parse SSE-C headers: %v", err)
  1503. return nil, nil, err
  1504. }
  1505. // Check if this is a multipart SSE-C object
  1506. isMultipartSSEC := false
  1507. sseCChunks := 0
  1508. for i, chunk := range entry.GetChunks() {
  1509. glog.V(4).Infof("Chunk %d: sseType=%d, hasMetadata=%t", i, chunk.GetSseType(), len(chunk.GetSseMetadata()) > 0)
  1510. if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
  1511. sseCChunks++
  1512. }
  1513. }
  1514. isMultipartSSEC = sseCChunks > 1
  1515. glog.Infof("SSE-C copy analysis: total chunks=%d, sseC chunks=%d, isMultipart=%t", len(entry.GetChunks()), sseCChunks, isMultipartSSEC)
  1516. if isMultipartSSEC {
  1517. glog.V(2).Infof("Detected multipart SSE-C object with %d encrypted chunks for copy", sseCChunks)
  1518. return s3a.copyMultipartSSECChunks(entry, copySourceKey, destKey, r.URL.Path)
  1519. }
  1520. // Single-part SSE-C object: use original logic
  1521. // Determine copy strategy
  1522. strategy, err := DetermineSSECCopyStrategy(entry.Extended, copySourceKey, destKey)
  1523. if err != nil {
  1524. return nil, nil, err
  1525. }
  1526. glog.V(2).Infof("SSE-C copy strategy for single-part %s: %v", r.URL.Path, strategy)
  1527. switch strategy {
  1528. case SSECCopyStrategyDirect:
  1529. // FAST PATH: Direct chunk copy
  1530. glog.V(2).Infof("Using fast path: direct chunk copy for %s", r.URL.Path)
  1531. chunks, err := s3a.copyChunks(entry, r.URL.Path)
  1532. return chunks, nil, err
  1533. case SSECCopyStrategyDecryptEncrypt:
  1534. // SLOW PATH: Decrypt and re-encrypt
  1535. glog.V(2).Infof("Using slow path: decrypt/re-encrypt for %s", r.URL.Path)
  1536. chunks, destIV, err := s3a.copyChunksWithReencryption(entry, copySourceKey, destKey, r.URL.Path)
  1537. if err != nil {
  1538. return nil, nil, err
  1539. }
  1540. // Create destination metadata with IV and SSE-C headers
  1541. dstMetadata := make(map[string][]byte)
  1542. if destKey != nil && len(destIV) > 0 {
  1543. // Store the IV
  1544. StoreIVInMetadata(dstMetadata, destIV)
  1545. // Store SSE-C algorithm and key MD5 for proper metadata
  1546. dstMetadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256")
  1547. dstMetadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(destKey.KeyMD5)
  1548. glog.V(2).Infof("Prepared IV and SSE-C metadata for destination copy: %s", r.URL.Path)
  1549. }
  1550. return chunks, dstMetadata, nil
  1551. default:
  1552. return nil, nil, fmt.Errorf("unknown SSE-C copy strategy: %v", strategy)
  1553. }
  1554. }
  1555. // copyChunksWithReencryption handles the slow path: decrypt source and re-encrypt for destination
  1556. // Returns the destination chunks and the IV used for encryption (if any)
  1557. func (s3a *S3ApiServer) copyChunksWithReencryption(entry *filer_pb.Entry, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) ([]*filer_pb.FileChunk, []byte, error) {
  1558. dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks()))
  1559. const defaultChunkCopyConcurrency = 4
  1560. executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency) // Limit to configurable concurrent operations
  1561. errChan := make(chan error, len(entry.GetChunks()))
  1562. // Generate a single IV for the destination object (if destination is encrypted)
  1563. var destIV []byte
  1564. if destKey != nil {
  1565. destIV = make([]byte, s3_constants.AESBlockSize)
  1566. if _, err := io.ReadFull(rand.Reader, destIV); err != nil {
  1567. return nil, nil, fmt.Errorf("failed to generate destination IV: %w", err)
  1568. }
  1569. }
  1570. for i, chunk := range entry.GetChunks() {
  1571. chunkIndex := i
  1572. executor.Execute(func() {
  1573. dstChunk, err := s3a.copyChunkWithReencryption(chunk, copySourceKey, destKey, dstPath, entry.Extended, destIV)
  1574. if err != nil {
  1575. errChan <- fmt.Errorf("chunk %d: %v", chunkIndex, err)
  1576. return
  1577. }
  1578. dstChunks[chunkIndex] = dstChunk
  1579. errChan <- nil
  1580. })
  1581. }
  1582. // Wait for all operations to complete and check for errors
  1583. for i := 0; i < len(entry.GetChunks()); i++ {
  1584. if err := <-errChan; err != nil {
  1585. return nil, nil, err
  1586. }
  1587. }
  1588. return dstChunks, destIV, nil
  1589. }
  1590. // copyChunkWithReencryption copies a single chunk with decrypt/re-encrypt
  1591. func (s3a *S3ApiServer) copyChunkWithReencryption(chunk *filer_pb.FileChunk, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string, srcMetadata map[string][]byte, destIV []byte) (*filer_pb.FileChunk, error) {
  1592. // Create destination chunk
  1593. dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
  1594. // Prepare chunk copy (assign new volume and get source URL)
  1595. assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath)
  1596. if err != nil {
  1597. return nil, err
  1598. }
  1599. // Set file ID on destination chunk
  1600. if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
  1601. return nil, err
  1602. }
  1603. // Download encrypted chunk data
  1604. encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size))
  1605. if err != nil {
  1606. return nil, fmt.Errorf("download encrypted chunk data: %w", err)
  1607. }
  1608. var finalData []byte
  1609. // Decrypt if source is encrypted
  1610. if copySourceKey != nil {
  1611. // Get IV from source metadata
  1612. srcIV, err := GetIVFromMetadata(srcMetadata)
  1613. if err != nil {
  1614. return nil, fmt.Errorf("failed to get IV from metadata: %w", err)
  1615. }
  1616. // Use counter offset based on chunk position in the original object
  1617. decryptedReader, decErr := CreateSSECDecryptedReaderWithOffset(bytes.NewReader(encryptedData), copySourceKey, srcIV, uint64(chunk.Offset))
  1618. if decErr != nil {
  1619. return nil, fmt.Errorf("create decrypted reader: %w", decErr)
  1620. }
  1621. decryptedData, readErr := io.ReadAll(decryptedReader)
  1622. if readErr != nil {
  1623. return nil, fmt.Errorf("decrypt chunk data: %w", readErr)
  1624. }
  1625. finalData = decryptedData
  1626. } else {
  1627. // Source is unencrypted
  1628. finalData = encryptedData
  1629. }
  1630. // Re-encrypt if destination should be encrypted
  1631. if destKey != nil {
  1632. // Use the provided destination IV with counter offset based on chunk position
  1633. // This ensures all chunks of the same object use the same IV with different counters
  1634. encryptedReader, encErr := CreateSSECEncryptedReaderWithOffset(bytes.NewReader(finalData), destKey, destIV, uint64(chunk.Offset))
  1635. if encErr != nil {
  1636. return nil, fmt.Errorf("create encrypted reader: %w", encErr)
  1637. }
  1638. reencryptedData, readErr := io.ReadAll(encryptedReader)
  1639. if readErr != nil {
  1640. return nil, fmt.Errorf("re-encrypt chunk data: %w", readErr)
  1641. }
  1642. finalData = reencryptedData
  1643. // Update chunk size to include IV
  1644. dstChunk.Size = uint64(len(finalData))
  1645. }
  1646. // Upload the processed data
  1647. if err := s3a.uploadChunkData(finalData, assignResult); err != nil {
  1648. return nil, fmt.Errorf("upload processed chunk data: %w", err)
  1649. }
  1650. return dstChunk, nil
  1651. }
  1652. // copyChunksWithSSEKMS handles SSE-KMS aware copying with smart fast/slow path selection
  1653. // Returns chunks and destination metadata like SSE-C for consistency
  1654. func (s3a *S3ApiServer) copyChunksWithSSEKMS(entry *filer_pb.Entry, r *http.Request, bucket string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
  1655. glog.Infof("copyChunksWithSSEKMS called for %s with %d chunks", r.URL.Path, len(entry.GetChunks()))
  1656. // Parse SSE-KMS headers from copy request
  1657. destKeyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r)
  1658. if err != nil {
  1659. return nil, nil, err
  1660. }
  1661. // Check if this is a multipart SSE-KMS object
  1662. isMultipartSSEKMS := false
  1663. sseKMSChunks := 0
  1664. for i, chunk := range entry.GetChunks() {
  1665. glog.V(4).Infof("Chunk %d: sseType=%d, hasKMSMetadata=%t", i, chunk.GetSseType(), len(chunk.GetSseMetadata()) > 0)
  1666. if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS {
  1667. sseKMSChunks++
  1668. }
  1669. }
  1670. isMultipartSSEKMS = sseKMSChunks > 1
  1671. glog.Infof("SSE-KMS copy analysis: total chunks=%d, sseKMS chunks=%d, isMultipart=%t", len(entry.GetChunks()), sseKMSChunks, isMultipartSSEKMS)
  1672. if isMultipartSSEKMS {
  1673. glog.V(2).Infof("Detected multipart SSE-KMS object with %d encrypted chunks for copy", sseKMSChunks)
  1674. return s3a.copyMultipartSSEKMSChunks(entry, destKeyID, encryptionContext, bucketKeyEnabled, r.URL.Path, bucket)
  1675. }
  1676. // Single-part SSE-KMS object: use existing logic
  1677. // If no SSE-KMS headers and source is not SSE-KMS encrypted, use regular copy
  1678. if destKeyID == "" && !IsSSEKMSEncrypted(entry.Extended) {
  1679. chunks, err := s3a.copyChunks(entry, r.URL.Path)
  1680. return chunks, nil, err
  1681. }
  1682. // Apply bucket default encryption if no explicit key specified
  1683. if destKeyID == "" {
  1684. bucketMetadata, err := s3a.getBucketMetadata(bucket)
  1685. if err != nil {
  1686. glog.V(2).Infof("Could not get bucket metadata for default encryption: %v", err)
  1687. } else if bucketMetadata != nil && bucketMetadata.Encryption != nil && bucketMetadata.Encryption.SseAlgorithm == "aws:kms" {
  1688. destKeyID = bucketMetadata.Encryption.KmsKeyId
  1689. bucketKeyEnabled = bucketMetadata.Encryption.BucketKeyEnabled
  1690. }
  1691. }
  1692. // Determine copy strategy
  1693. strategy, err := DetermineSSEKMSCopyStrategy(entry.Extended, destKeyID)
  1694. if err != nil {
  1695. return nil, nil, err
  1696. }
  1697. glog.V(2).Infof("SSE-KMS copy strategy for %s: %v", r.URL.Path, strategy)
  1698. switch strategy {
  1699. case SSEKMSCopyStrategyDirect:
  1700. // FAST PATH: Direct chunk copy (same key or both unencrypted)
  1701. glog.V(2).Infof("Using fast path: direct chunk copy for %s", r.URL.Path)
  1702. chunks, err := s3a.copyChunks(entry, r.URL.Path)
  1703. // For direct copy, generate destination metadata if we're encrypting to SSE-KMS
  1704. var dstMetadata map[string][]byte
  1705. if destKeyID != "" {
  1706. dstMetadata = make(map[string][]byte)
  1707. if encryptionContext == nil {
  1708. encryptionContext = BuildEncryptionContext(bucket, r.URL.Path, bucketKeyEnabled)
  1709. }
  1710. sseKey := &SSEKMSKey{
  1711. KeyID: destKeyID,
  1712. EncryptionContext: encryptionContext,
  1713. BucketKeyEnabled: bucketKeyEnabled,
  1714. }
  1715. if kmsMetadata, serializeErr := SerializeSSEKMSMetadata(sseKey); serializeErr == nil {
  1716. dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata
  1717. glog.V(3).Infof("Generated SSE-KMS metadata for direct copy: keyID=%s", destKeyID)
  1718. } else {
  1719. glog.Errorf("Failed to serialize SSE-KMS metadata for direct copy: %v", serializeErr)
  1720. }
  1721. }
  1722. return chunks, dstMetadata, err
  1723. case SSEKMSCopyStrategyDecryptEncrypt:
  1724. // SLOW PATH: Decrypt source and re-encrypt for destination
  1725. glog.V(2).Infof("Using slow path: decrypt/re-encrypt for %s", r.URL.Path)
  1726. return s3a.copyChunksWithSSEKMSReencryption(entry, destKeyID, encryptionContext, bucketKeyEnabled, r.URL.Path, bucket)
  1727. default:
  1728. return nil, nil, fmt.Errorf("unknown SSE-KMS copy strategy: %v", strategy)
  1729. }
  1730. }
  1731. // copyChunksWithSSEKMSReencryption handles the slow path: decrypt source and re-encrypt for destination
  1732. // Returns chunks and destination metadata like SSE-C for consistency
  1733. func (s3a *S3ApiServer) copyChunksWithSSEKMSReencryption(entry *filer_pb.Entry, destKeyID string, encryptionContext map[string]string, bucketKeyEnabled bool, dstPath, bucket string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
  1734. var dstChunks []*filer_pb.FileChunk
  1735. // Extract and deserialize source SSE-KMS metadata
  1736. var sourceSSEKey *SSEKMSKey
  1737. if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
  1738. var err error
  1739. sourceSSEKey, err = DeserializeSSEKMSMetadata(keyData)
  1740. if err != nil {
  1741. return nil, nil, fmt.Errorf("failed to deserialize source SSE-KMS metadata: %w", err)
  1742. }
  1743. glog.V(3).Infof("Extracted source SSE-KMS key: keyID=%s, bucketKey=%t", sourceSSEKey.KeyID, sourceSSEKey.BucketKeyEnabled)
  1744. }
  1745. // Process chunks
  1746. for _, chunk := range entry.GetChunks() {
  1747. dstChunk, err := s3a.copyChunkWithSSEKMSReencryption(chunk, sourceSSEKey, destKeyID, encryptionContext, bucketKeyEnabled, dstPath, bucket)
  1748. if err != nil {
  1749. return nil, nil, fmt.Errorf("copy chunk with SSE-KMS re-encryption: %w", err)
  1750. }
  1751. dstChunks = append(dstChunks, dstChunk)
  1752. }
  1753. // Generate destination metadata for SSE-KMS encryption (consistent with SSE-C pattern)
  1754. dstMetadata := make(map[string][]byte)
  1755. if destKeyID != "" {
  1756. // Build encryption context if not provided
  1757. if encryptionContext == nil {
  1758. encryptionContext = BuildEncryptionContext(bucket, dstPath, bucketKeyEnabled)
  1759. }
  1760. // Create SSE-KMS key structure for destination metadata
  1761. sseKey := &SSEKMSKey{
  1762. KeyID: destKeyID,
  1763. EncryptionContext: encryptionContext,
  1764. BucketKeyEnabled: bucketKeyEnabled,
  1765. // Note: EncryptedDataKey will be generated during actual encryption
  1766. // IV is also generated per chunk during encryption
  1767. }
  1768. // Serialize SSE-KMS metadata for storage
  1769. kmsMetadata, err := SerializeSSEKMSMetadata(sseKey)
  1770. if err != nil {
  1771. return nil, nil, fmt.Errorf("serialize destination SSE-KMS metadata: %w", err)
  1772. }
  1773. dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata
  1774. glog.V(3).Infof("Generated destination SSE-KMS metadata: keyID=%s, bucketKey=%t", destKeyID, bucketKeyEnabled)
  1775. }
  1776. return dstChunks, dstMetadata, nil
  1777. }
  1778. // copyChunkWithSSEKMSReencryption copies a single chunk with SSE-KMS decrypt/re-encrypt
  1779. func (s3a *S3ApiServer) copyChunkWithSSEKMSReencryption(chunk *filer_pb.FileChunk, sourceSSEKey *SSEKMSKey, destKeyID string, encryptionContext map[string]string, bucketKeyEnabled bool, dstPath, bucket string) (*filer_pb.FileChunk, error) {
  1780. // Create destination chunk
  1781. dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
  1782. // Prepare chunk copy (assign new volume and get source URL)
  1783. assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath)
  1784. if err != nil {
  1785. return nil, err
  1786. }
  1787. // Set file ID on destination chunk
  1788. if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
  1789. return nil, err
  1790. }
  1791. // Download chunk data
  1792. chunkData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size))
  1793. if err != nil {
  1794. return nil, fmt.Errorf("download chunk data: %w", err)
  1795. }
  1796. var finalData []byte
  1797. // Decrypt source data if it's SSE-KMS encrypted
  1798. if sourceSSEKey != nil {
  1799. // For SSE-KMS, the encrypted chunk data contains IV + encrypted content
  1800. // Use the source SSE key to decrypt the chunk data
  1801. decryptedReader, err := CreateSSEKMSDecryptedReader(bytes.NewReader(chunkData), sourceSSEKey)
  1802. if err != nil {
  1803. return nil, fmt.Errorf("create SSE-KMS decrypted reader: %w", err)
  1804. }
  1805. decryptedData, err := io.ReadAll(decryptedReader)
  1806. if err != nil {
  1807. return nil, fmt.Errorf("decrypt chunk data: %w", err)
  1808. }
  1809. finalData = decryptedData
  1810. glog.V(4).Infof("Decrypted chunk data: %d bytes → %d bytes", len(chunkData), len(finalData))
  1811. } else {
  1812. // Source is not SSE-KMS encrypted, use data as-is
  1813. finalData = chunkData
  1814. }
  1815. // Re-encrypt if destination should be SSE-KMS encrypted
  1816. if destKeyID != "" {
  1817. // Encryption context should already be provided by the caller
  1818. // But ensure we have a fallback for robustness
  1819. if encryptionContext == nil {
  1820. encryptionContext = BuildEncryptionContext(bucket, dstPath, bucketKeyEnabled)
  1821. }
  1822. encryptedReader, _, err := CreateSSEKMSEncryptedReaderWithBucketKey(bytes.NewReader(finalData), destKeyID, encryptionContext, bucketKeyEnabled)
  1823. if err != nil {
  1824. return nil, fmt.Errorf("create SSE-KMS encrypted reader: %w", err)
  1825. }
  1826. reencryptedData, err := io.ReadAll(encryptedReader)
  1827. if err != nil {
  1828. return nil, fmt.Errorf("re-encrypt chunk data: %w", err)
  1829. }
  1830. // Store original decrypted data size for logging
  1831. originalSize := len(finalData)
  1832. finalData = reencryptedData
  1833. glog.V(4).Infof("Re-encrypted chunk data: %d bytes → %d bytes", originalSize, len(finalData))
  1834. // Update chunk size to include IV and encryption overhead
  1835. dstChunk.Size = uint64(len(finalData))
  1836. }
  1837. // Upload the processed data
  1838. if err := s3a.uploadChunkData(finalData, assignResult); err != nil {
  1839. return nil, fmt.Errorf("upload processed chunk data: %w", err)
  1840. }
  1841. glog.V(3).Infof("Successfully processed SSE-KMS chunk re-encryption: src_key=%s, dst_key=%s, size=%d→%d",
  1842. getKeyIDString(sourceSSEKey), destKeyID, len(chunkData), len(finalData))
  1843. return dstChunk, nil
  1844. }
  1845. // getKeyIDString safely gets the KeyID from an SSEKMSKey, handling nil cases
  1846. func getKeyIDString(key *SSEKMSKey) string {
  1847. if key == nil {
  1848. return "none"
  1849. }
  1850. if key.KeyID == "" {
  1851. return "default"
  1852. }
  1853. return key.KeyID
  1854. }
  1855. // EncryptionHeaderContext holds encryption type information and header classifications
  1856. type EncryptionHeaderContext struct {
  1857. SrcSSEC, SrcSSEKMS, SrcSSES3 bool
  1858. DstSSEC, DstSSEKMS, DstSSES3 bool
  1859. IsSSECHeader, IsSSEKMSHeader, IsSSES3Header bool
  1860. }
  1861. // newEncryptionHeaderContext creates a context for encryption header processing
  1862. func newEncryptionHeaderContext(headerKey string, srcSSEC, srcSSEKMS, srcSSES3, dstSSEC, dstSSEKMS, dstSSES3 bool) *EncryptionHeaderContext {
  1863. return &EncryptionHeaderContext{
  1864. SrcSSEC: srcSSEC, SrcSSEKMS: srcSSEKMS, SrcSSES3: srcSSES3,
  1865. DstSSEC: dstSSEC, DstSSEKMS: dstSSEKMS, DstSSES3: dstSSES3,
  1866. IsSSECHeader: isSSECHeader(headerKey),
  1867. IsSSEKMSHeader: isSSEKMSHeader(headerKey, srcSSEKMS, dstSSEKMS),
  1868. IsSSES3Header: isSSES3Header(headerKey, srcSSES3, dstSSES3),
  1869. }
  1870. }
  1871. // isSSECHeader checks if the header is SSE-C specific
  1872. func isSSECHeader(headerKey string) bool {
  1873. return headerKey == s3_constants.AmzServerSideEncryptionCustomerAlgorithm ||
  1874. headerKey == s3_constants.AmzServerSideEncryptionCustomerKeyMD5 ||
  1875. headerKey == s3_constants.SeaweedFSSSEIV
  1876. }
  1877. // isSSEKMSHeader checks if the header is SSE-KMS specific
  1878. func isSSEKMSHeader(headerKey string, srcSSEKMS, dstSSEKMS bool) bool {
  1879. return (headerKey == s3_constants.AmzServerSideEncryption && (srcSSEKMS || dstSSEKMS)) ||
  1880. headerKey == s3_constants.AmzServerSideEncryptionAwsKmsKeyId ||
  1881. headerKey == s3_constants.SeaweedFSSSEKMSKey ||
  1882. headerKey == s3_constants.SeaweedFSSSEKMSKeyID ||
  1883. headerKey == s3_constants.SeaweedFSSSEKMSEncryption ||
  1884. headerKey == s3_constants.SeaweedFSSSEKMSBucketKeyEnabled ||
  1885. headerKey == s3_constants.SeaweedFSSSEKMSEncryptionContext ||
  1886. headerKey == s3_constants.SeaweedFSSSEKMSBaseIV
  1887. }
  1888. // isSSES3Header checks if the header is SSE-S3 specific
  1889. func isSSES3Header(headerKey string, srcSSES3, dstSSES3 bool) bool {
  1890. return (headerKey == s3_constants.AmzServerSideEncryption && (srcSSES3 || dstSSES3)) ||
  1891. headerKey == s3_constants.SeaweedFSSSES3Key ||
  1892. headerKey == s3_constants.SeaweedFSSSES3Encryption ||
  1893. headerKey == s3_constants.SeaweedFSSSES3BaseIV ||
  1894. headerKey == s3_constants.SeaweedFSSSES3KeyData
  1895. }
  1896. // shouldSkipCrossEncryptionHeader handles cross-encryption copy scenarios
  1897. func (ctx *EncryptionHeaderContext) shouldSkipCrossEncryptionHeader() bool {
  1898. // SSE-C to SSE-KMS: skip SSE-C headers
  1899. if ctx.SrcSSEC && ctx.DstSSEKMS && ctx.IsSSECHeader {
  1900. return true
  1901. }
  1902. // SSE-KMS to SSE-C: skip SSE-KMS headers
  1903. if ctx.SrcSSEKMS && ctx.DstSSEC && ctx.IsSSEKMSHeader {
  1904. return true
  1905. }
  1906. // SSE-C to SSE-S3: skip SSE-C headers
  1907. if ctx.SrcSSEC && ctx.DstSSES3 && ctx.IsSSECHeader {
  1908. return true
  1909. }
  1910. // SSE-S3 to SSE-C: skip SSE-S3 headers
  1911. if ctx.SrcSSES3 && ctx.DstSSEC && ctx.IsSSES3Header {
  1912. return true
  1913. }
  1914. // SSE-KMS to SSE-S3: skip SSE-KMS headers
  1915. if ctx.SrcSSEKMS && ctx.DstSSES3 && ctx.IsSSEKMSHeader {
  1916. return true
  1917. }
  1918. // SSE-S3 to SSE-KMS: skip SSE-S3 headers
  1919. if ctx.SrcSSES3 && ctx.DstSSEKMS && ctx.IsSSES3Header {
  1920. return true
  1921. }
  1922. return false
  1923. }
  1924. // shouldSkipEncryptedToUnencryptedHeader handles encrypted to unencrypted copy scenarios
  1925. func (ctx *EncryptionHeaderContext) shouldSkipEncryptedToUnencryptedHeader() bool {
  1926. // Skip all encryption headers when copying from encrypted to unencrypted
  1927. hasSourceEncryption := ctx.SrcSSEC || ctx.SrcSSEKMS || ctx.SrcSSES3
  1928. hasDestinationEncryption := ctx.DstSSEC || ctx.DstSSEKMS || ctx.DstSSES3
  1929. isAnyEncryptionHeader := ctx.IsSSECHeader || ctx.IsSSEKMSHeader || ctx.IsSSES3Header
  1930. return hasSourceEncryption && !hasDestinationEncryption && isAnyEncryptionHeader
  1931. }
  1932. // shouldSkipEncryptionHeader determines if a header should be skipped when copying extended attributes
  1933. // based on the source and destination encryption types. This consolidates the repetitive logic for
  1934. // filtering encryption-related headers during copy operations.
  1935. func shouldSkipEncryptionHeader(headerKey string,
  1936. srcSSEC, srcSSEKMS, srcSSES3 bool,
  1937. dstSSEC, dstSSEKMS, dstSSES3 bool) bool {
  1938. // Create context to reduce complexity and improve testability
  1939. ctx := newEncryptionHeaderContext(headerKey, srcSSEC, srcSSEKMS, srcSSES3, dstSSEC, dstSSEKMS, dstSSES3)
  1940. // If it's not an encryption header, don't skip it
  1941. if !ctx.IsSSECHeader && !ctx.IsSSEKMSHeader && !ctx.IsSSES3Header {
  1942. return false
  1943. }
  1944. // Handle cross-encryption scenarios (different encryption types)
  1945. if ctx.shouldSkipCrossEncryptionHeader() {
  1946. return true
  1947. }
  1948. // Handle encrypted to unencrypted scenarios
  1949. if ctx.shouldSkipEncryptedToUnencryptedHeader() {
  1950. return true
  1951. }
  1952. // Default: don't skip the header
  1953. return false
  1954. }