command_ec_common.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067
  1. package shell
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/rand/v2"
  7. "regexp"
  8. "slices"
  9. "sort"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/operation"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  19. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  20. "google.golang.org/grpc"
  21. )
  22. type DataCenterId string
  23. type EcNodeId string
  24. type RackId string
  25. type EcNode struct {
  26. info *master_pb.DataNodeInfo
  27. dc DataCenterId
  28. rack RackId
  29. freeEcSlot int
  30. }
  31. type CandidateEcNode struct {
  32. ecNode *EcNode
  33. shardCount int
  34. }
  35. type EcRack struct {
  36. ecNodes map[EcNodeId]*EcNode
  37. freeEcSlot int
  38. }
  39. var (
  40. ecBalanceAlgorithmDescription = `
  41. func EcBalance() {
  42. for each collection:
  43. balanceEcVolumes(collectionName)
  44. for each rack:
  45. balanceEcRack(rack)
  46. }
  47. func balanceEcVolumes(collectionName){
  48. for each volume:
  49. doDeduplicateEcShards(volumeId)
  50. tracks rack~shardCount mapping
  51. for each volume:
  52. doBalanceEcShardsAcrossRacks(volumeId)
  53. for each volume:
  54. doBalanceEcShardsWithinRacks(volumeId)
  55. }
  56. // spread ec shards into more racks
  57. func doBalanceEcShardsAcrossRacks(volumeId){
  58. tracks rack~volumeIdShardCount mapping
  59. averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
  60. ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  61. for each ecShardsToMove {
  62. destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement)
  63. destVolumeServers = volume servers on the destRack
  64. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  65. }
  66. }
  67. func doBalanceEcShardsWithinRacks(volumeId){
  68. racks = collect all racks that the volume id is on
  69. for rack, shards := range racks
  70. doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
  71. }
  72. // move ec shards
  73. func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
  74. tracks volumeServer~volumeIdShardCount mapping
  75. averageShardCount = len(shards) / numVolumeServers
  76. volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
  77. ecShardsToMove = select overflown ec shards from volumeServersOverAverage
  78. for each ecShardsToMove {
  79. destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement)
  80. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  81. }
  82. }
  83. // move ec shards while keeping shard distribution for the same volume unchanged or more even
  84. func balanceEcRack(rack){
  85. averageShardCount = total shards / numVolumeServers
  86. for hasMovedOneEcShard {
  87. sort all volume servers ordered by the number of local ec shards
  88. pick the volume server A with the lowest number of ec shards x
  89. pick the volume server B with the highest number of ec shards y
  90. if y > averageShardCount and x +1 <= averageShardCount {
  91. if B has a ec shard with volume id v that A does not have {
  92. move one ec shard v from B to A
  93. hasMovedOneEcShard = true
  94. }
  95. }
  96. }
  97. }
  98. `
  99. // Overridable functions for testing.
  100. getDefaultReplicaPlacement = _getDefaultReplicaPlacement
  101. )
  102. func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
  103. var resp *master_pb.GetMasterConfigurationResponse
  104. var err error
  105. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  106. resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  107. return err
  108. })
  109. if err != nil {
  110. return nil, err
  111. }
  112. return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
  113. }
  114. func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) {
  115. var rp *super_block.ReplicaPlacement
  116. var err error
  117. if replicaStr != "" {
  118. rp, err = super_block.NewReplicaPlacementFromString(replicaStr)
  119. if err != nil {
  120. return rp, err
  121. }
  122. fmt.Printf("using replica placement %q for EC volumes\n", rp.String())
  123. } else {
  124. // No replica placement argument provided, resolve from master default settings.
  125. rp, err = getDefaultReplicaPlacement(commandEnv)
  126. if err != nil {
  127. return rp, err
  128. }
  129. fmt.Printf("using master default replica placement %q for EC volumes\n", rp.String())
  130. }
  131. return rp, nil
  132. }
  133. func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
  134. if delayBeforeCollecting > 0 {
  135. time.Sleep(delayBeforeCollecting)
  136. }
  137. var resp *master_pb.VolumeListResponse
  138. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  139. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  140. return err
  141. })
  142. if err != nil {
  143. return
  144. }
  145. return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
  146. }
  147. func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  148. // list all possible locations
  149. // collect topology information
  150. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  151. if err != nil {
  152. return
  153. }
  154. // find out all volume servers with one slot left.
  155. ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
  156. sortEcNodesByFreeslotsDescending(ecNodes)
  157. return
  158. }
  159. func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
  160. return collectEcNodesForDC(commandEnv, "")
  161. }
  162. func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string {
  163. if len(vids) == 0 {
  164. return nil
  165. }
  166. found := map[string]bool{}
  167. for _, dc := range t.DataCenterInfos {
  168. for _, r := range dc.RackInfos {
  169. for _, dn := range r.DataNodeInfos {
  170. for _, diskInfo := range dn.DiskInfos {
  171. for _, vi := range diskInfo.VolumeInfos {
  172. for _, vid := range vids {
  173. if needle.VolumeId(vi.Id) == vid {
  174. found[vi.Collection] = true
  175. }
  176. }
  177. }
  178. for _, ecs := range diskInfo.EcShardInfos {
  179. for _, vid := range vids {
  180. if needle.VolumeId(ecs.Id) == vid {
  181. found[ecs.Collection] = true
  182. }
  183. }
  184. }
  185. }
  186. }
  187. }
  188. }
  189. if len(found) == 0 {
  190. return nil
  191. }
  192. collections := []string{}
  193. for k, _ := range found {
  194. collections = append(collections, k)
  195. }
  196. sort.Strings(collections)
  197. return collections
  198. }
  199. func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
  200. if !commandEnv.isLocked() {
  201. return fmt.Errorf("lock is lost")
  202. }
  203. copiedShardIds := []uint32{uint32(shardId)}
  204. if applyBalancing {
  205. existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
  206. // ask destination node to copy shard and the ecx file from source node, and mount it
  207. copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
  208. if err != nil {
  209. return err
  210. }
  211. // unmount the to be deleted shards
  212. err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds)
  213. if err != nil {
  214. return err
  215. }
  216. // ask source node to delete the shard, and maybe the ecx file
  217. err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds)
  218. if err != nil {
  219. return err
  220. }
  221. fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
  222. }
  223. destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
  224. existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
  225. return nil
  226. }
  227. func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
  228. targetServer *EcNode, shardIdsToCopy []uint32,
  229. volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
  230. fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  231. targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
  232. err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  233. if targetAddress != existingLocation {
  234. fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
  235. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  236. VolumeId: uint32(volumeId),
  237. Collection: collection,
  238. ShardIds: shardIdsToCopy,
  239. CopyEcxFile: true,
  240. CopyEcjFile: true,
  241. CopyVifFile: true,
  242. SourceDataNode: string(existingLocation),
  243. })
  244. if copyErr != nil {
  245. return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
  246. }
  247. }
  248. fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
  249. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  250. VolumeId: uint32(volumeId),
  251. Collection: collection,
  252. ShardIds: shardIdsToCopy,
  253. })
  254. if mountErr != nil {
  255. return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
  256. }
  257. if targetAddress != existingLocation {
  258. copiedShardIds = shardIdsToCopy
  259. glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
  260. }
  261. return nil
  262. })
  263. if err != nil {
  264. return
  265. }
  266. return
  267. }
  268. func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo)) {
  269. for _, dc := range topo.DataCenterInfos {
  270. for _, rack := range dc.RackInfos {
  271. for _, dn := range rack.DataNodeInfos {
  272. fn(DataCenterId(dc.Id), RackId(rack.Id), dn)
  273. }
  274. }
  275. }
  276. }
  277. func sortEcNodesByFreeslotsDescending(ecNodes []*EcNode) {
  278. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  279. return b.freeEcSlot - a.freeEcSlot
  280. })
  281. }
  282. func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
  283. slices.SortFunc(ecNodes, func(a, b *EcNode) int {
  284. return a.freeEcSlot - b.freeEcSlot
  285. })
  286. }
  287. // if the index node changed the freeEcSlot, need to keep every EcNode still sorted
  288. func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
  289. for i := index - 1; i >= 0; i-- {
  290. if lessThan(i+1, i) {
  291. swap(data, i, i+1)
  292. } else {
  293. break
  294. }
  295. }
  296. for i := index + 1; i < len(data); i++ {
  297. if lessThan(i, i-1) {
  298. swap(data, i, i-1)
  299. } else {
  300. break
  301. }
  302. }
  303. }
  304. func swap(data []*CandidateEcNode, i, j int) {
  305. t := data[i]
  306. data[i] = data[j]
  307. data[j] = t
  308. }
  309. func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
  310. for _, ecShardInfo := range ecShardInfos {
  311. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  312. count += shardBits.ShardIdCount()
  313. }
  314. return
  315. }
  316. func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
  317. if dn.DiskInfos == nil {
  318. return 0
  319. }
  320. diskInfo := dn.DiskInfos[string(diskType)]
  321. if diskInfo == nil {
  322. return 0
  323. }
  324. slots := int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
  325. if slots < 0 {
  326. return 0
  327. }
  328. return slots
  329. }
  330. func (ecNode *EcNode) localShardIdCount(vid uint32) int {
  331. for _, diskInfo := range ecNode.info.DiskInfos {
  332. for _, ecShardInfo := range diskInfo.EcShardInfos {
  333. if vid == ecShardInfo.Id {
  334. shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
  335. return shardBits.ShardIdCount()
  336. }
  337. }
  338. }
  339. return 0
  340. }
  341. func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
  342. eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
  343. if selectedDataCenter != "" && selectedDataCenter != string(dc) {
  344. return
  345. }
  346. freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
  347. ecNodes = append(ecNodes, &EcNode{
  348. info: dn,
  349. dc: dc,
  350. rack: rack,
  351. freeEcSlot: int(freeEcSlots),
  352. })
  353. totalFreeEcSlots += freeEcSlots
  354. })
  355. return
  356. }
  357. func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error {
  358. fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
  359. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  360. _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
  361. VolumeId: uint32(volumeId),
  362. Collection: collection,
  363. ShardIds: toBeDeletedShardIds,
  364. })
  365. return deleteErr
  366. })
  367. }
  368. func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error {
  369. fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
  370. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  371. _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
  372. VolumeId: uint32(volumeId),
  373. ShardIds: toBeUnmountedhardIds,
  374. })
  375. return deleteErr
  376. })
  377. }
  378. func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error {
  379. fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
  380. return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  381. _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
  382. VolumeId: uint32(volumeId),
  383. Collection: collection,
  384. ShardIds: toBeMountedhardIds,
  385. })
  386. return mountErr
  387. })
  388. }
  389. func ceilDivide(a, b int) int {
  390. var r int
  391. if (a % b) != 0 {
  392. r = 1
  393. }
  394. return (a / b) + r
  395. }
  396. func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
  397. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  398. for _, shardInfo := range diskInfo.EcShardInfos {
  399. if needle.VolumeId(shardInfo.Id) == vid {
  400. return erasure_coding.ShardBits(shardInfo.EcIndexBits)
  401. }
  402. }
  403. }
  404. return 0
  405. }
  406. func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
  407. foundVolume := false
  408. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  409. if found {
  410. for _, shardInfo := range diskInfo.EcShardInfos {
  411. if needle.VolumeId(shardInfo.Id) == vid {
  412. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  413. newShardBits := oldShardBits
  414. for _, shardId := range shardIds {
  415. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  416. }
  417. shardInfo.EcIndexBits = uint32(newShardBits)
  418. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  419. foundVolume = true
  420. break
  421. }
  422. }
  423. } else {
  424. diskInfo = &master_pb.DiskInfo{
  425. Type: string(types.HardDriveType),
  426. }
  427. ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
  428. }
  429. if !foundVolume {
  430. var newShardBits erasure_coding.ShardBits
  431. for _, shardId := range shardIds {
  432. newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
  433. }
  434. diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
  435. Id: uint32(vid),
  436. Collection: collection,
  437. EcIndexBits: uint32(newShardBits),
  438. DiskType: string(types.HardDriveType),
  439. })
  440. ecNode.freeEcSlot -= len(shardIds)
  441. }
  442. return ecNode
  443. }
  444. func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
  445. if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
  446. for _, shardInfo := range diskInfo.EcShardInfos {
  447. if needle.VolumeId(shardInfo.Id) == vid {
  448. oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
  449. newShardBits := oldShardBits
  450. for _, shardId := range shardIds {
  451. newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
  452. }
  453. shardInfo.EcIndexBits = uint32(newShardBits)
  454. ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
  455. }
  456. }
  457. }
  458. return ecNode
  459. }
  460. func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
  461. countMap := make(map[string]int)
  462. for _, d := range data {
  463. id, count := identifierFn(d)
  464. countMap[id] += count
  465. }
  466. return countMap
  467. }
  468. func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
  469. groupMap := make(map[string][]*EcNode)
  470. for _, d := range data {
  471. id := identifierFn(d)
  472. groupMap[id] = append(groupMap[id], d)
  473. }
  474. return groupMap
  475. }
  476. type ecBalancer struct {
  477. commandEnv *CommandEnv
  478. ecNodes []*EcNode
  479. replicaPlacement *super_block.ReplicaPlacement
  480. applyBalancing bool
  481. maxParallelization int
  482. }
  483. func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup {
  484. return NewErrorWaitGroup(ecb.maxParallelization)
  485. }
  486. func (ecb *ecBalancer) racks() map[RackId]*EcRack {
  487. racks := make(map[RackId]*EcRack)
  488. for _, ecNode := range ecb.ecNodes {
  489. if racks[ecNode.rack] == nil {
  490. racks[ecNode.rack] = &EcRack{
  491. ecNodes: make(map[EcNodeId]*EcNode),
  492. }
  493. }
  494. racks[ecNode.rack].ecNodes[EcNodeId(ecNode.info.Id)] = ecNode
  495. racks[ecNode.rack].freeEcSlot += ecNode.freeEcSlot
  496. }
  497. return racks
  498. }
  499. func (ecb *ecBalancer) balanceEcVolumes(collection string) error {
  500. fmt.Printf("balanceEcVolumes %s\n", collection)
  501. if err := ecb.deleteDuplicatedEcShards(collection); err != nil {
  502. return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
  503. }
  504. if err := ecb.balanceEcShardsAcrossRacks(collection); err != nil {
  505. return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
  506. }
  507. if err := ecb.balanceEcShardsWithinRacks(collection); err != nil {
  508. return fmt.Errorf("balance within racks collection %s ec shards: %v", collection, err)
  509. }
  510. return nil
  511. }
  512. func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error {
  513. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  514. ewg := ecb.errorWaitGroup()
  515. for vid, locations := range vidLocations {
  516. ewg.Add(func() error {
  517. return ecb.doDeduplicateEcShards(collection, vid, locations)
  518. })
  519. }
  520. return ewg.Wait()
  521. }
  522. func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error {
  523. // check whether this volume has ecNodes that are over average
  524. shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
  525. for _, ecNode := range locations {
  526. shardBits := findEcVolumeShards(ecNode, vid)
  527. for _, shardId := range shardBits.ShardIds() {
  528. shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
  529. }
  530. }
  531. for shardId, ecNodes := range shardToLocations {
  532. if len(ecNodes) <= 1 {
  533. continue
  534. }
  535. sortEcNodesByFreeslotsAscending(ecNodes)
  536. fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id)
  537. if !ecb.applyBalancing {
  538. continue
  539. }
  540. duplicatedShardIds := []uint32{uint32(shardId)}
  541. for _, ecNode := range ecNodes[1:] {
  542. if err := unmountEcShards(ecb.commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  543. return err
  544. }
  545. if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  546. return err
  547. }
  548. ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
  549. }
  550. }
  551. return nil
  552. }
  553. func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
  554. // collect vid => []ecNode, since previous steps can change the locations
  555. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  556. // spread the ec shards evenly
  557. ewg := ecb.errorWaitGroup()
  558. for vid, locations := range vidLocations {
  559. ewg.Add(func() error {
  560. return ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations)
  561. })
  562. }
  563. return ewg.Wait()
  564. }
  565. func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
  566. return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  567. shardBits := findEcVolumeShards(ecNode, vid)
  568. return string(ecNode.rack), shardBits.ShardIdCount()
  569. })
  570. }
  571. func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error {
  572. racks := ecb.racks()
  573. // calculate average number of shards an ec rack should have for one volume
  574. averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
  575. // see the volume's shards are in how many racks, and how many in each rack
  576. rackToShardCount := countShardsByRack(vid, locations)
  577. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  578. return string(ecNode.rack)
  579. })
  580. // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  581. ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
  582. for rackId, count := range rackToShardCount {
  583. if count <= averageShardsPerEcRack {
  584. continue
  585. }
  586. possibleEcNodes := rackEcNodesWithVid[rackId]
  587. for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
  588. ecShardsToMove[shardId] = ecNode
  589. }
  590. }
  591. for shardId, ecNode := range ecShardsToMove {
  592. rackId, err := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount)
  593. if err != nil {
  594. fmt.Printf("ec shard %d.%d at %s can not find a destination rack:\n%s\n", vid, shardId, ecNode.info.Id, err.Error())
  595. continue
  596. }
  597. var possibleDestinationEcNodes []*EcNode
  598. for _, n := range racks[rackId].ecNodes {
  599. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  600. }
  601. err = ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes)
  602. if err != nil {
  603. return err
  604. }
  605. rackToShardCount[string(rackId)] += 1
  606. rackToShardCount[string(ecNode.rack)] -= 1
  607. racks[rackId].freeEcSlot -= 1
  608. racks[ecNode.rack].freeEcSlot += 1
  609. }
  610. return nil
  611. }
  612. func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int) (RackId, error) {
  613. targets := []RackId{}
  614. targetShards := -1
  615. for _, shards := range rackToShardCount {
  616. if shards > targetShards {
  617. targetShards = shards
  618. }
  619. }
  620. details := ""
  621. for rackId, rack := range rackToEcNodes {
  622. shards := rackToShardCount[string(rackId)]
  623. if rack.freeEcSlot <= 0 {
  624. details += fmt.Sprintf(" Skipped %s because it has no free slots\n", rackId)
  625. continue
  626. }
  627. if ecb.replicaPlacement != nil && shards > ecb.replicaPlacement.DiffRackCount {
  628. details += fmt.Sprintf(" Skipped %s because shards %d > replica placement limit for other racks (%d)\n", rackId, shards, ecb.replicaPlacement.DiffRackCount)
  629. continue
  630. }
  631. if shards < targetShards {
  632. // Favor racks with less shards, to ensure an uniform distribution.
  633. targets = nil
  634. targetShards = shards
  635. }
  636. if shards == targetShards {
  637. targets = append(targets, rackId)
  638. }
  639. }
  640. if len(targets) == 0 {
  641. return "", errors.New(details)
  642. }
  643. return targets[rand.IntN(len(targets))], nil
  644. }
  645. func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
  646. // collect vid => []ecNode, since previous steps can change the locations
  647. vidLocations := ecb.collectVolumeIdToEcNodes(collection)
  648. racks := ecb.racks()
  649. // spread the ec shards evenly
  650. ewg := ecb.errorWaitGroup()
  651. for vid, locations := range vidLocations {
  652. // see the volume's shards are in how many racks, and how many in each rack
  653. rackToShardCount := countShardsByRack(vid, locations)
  654. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  655. return string(ecNode.rack)
  656. })
  657. for rackId, _ := range rackToShardCount {
  658. var possibleDestinationEcNodes []*EcNode
  659. for _, n := range racks[RackId(rackId)].ecNodes {
  660. if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
  661. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  662. }
  663. }
  664. sourceEcNodes := rackEcNodesWithVid[rackId]
  665. averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
  666. ewg.Add(func() error {
  667. return ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes)
  668. })
  669. }
  670. }
  671. return ewg.Wait()
  672. }
  673. func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
  674. for _, ecNode := range existingLocations {
  675. shardBits := findEcVolumeShards(ecNode, vid)
  676. overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
  677. for _, shardId := range shardBits.ShardIds() {
  678. if overLimitCount <= 0 {
  679. break
  680. }
  681. fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
  682. err := ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes)
  683. if err != nil {
  684. return err
  685. }
  686. overLimitCount--
  687. }
  688. }
  689. return nil
  690. }
  691. func (ecb *ecBalancer) balanceEcRacks() error {
  692. // balance one rack for all ec shards
  693. ewg := ecb.errorWaitGroup()
  694. for _, ecRack := range ecb.racks() {
  695. ewg.Add(func() error {
  696. return ecb.doBalanceEcRack(ecRack)
  697. })
  698. }
  699. return ewg.Wait()
  700. }
  701. func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
  702. if len(ecRack.ecNodes) <= 1 {
  703. return nil
  704. }
  705. var rackEcNodes []*EcNode
  706. for _, node := range ecRack.ecNodes {
  707. rackEcNodes = append(rackEcNodes, node)
  708. }
  709. ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
  710. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  711. if !found {
  712. return
  713. }
  714. for _, ecShardInfo := range diskInfo.EcShardInfos {
  715. count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
  716. }
  717. return ecNode.info.Id, count
  718. })
  719. var totalShardCount int
  720. for _, count := range ecNodeIdToShardCount {
  721. totalShardCount += count
  722. }
  723. averageShardCount := ceilDivide(totalShardCount, len(rackEcNodes))
  724. hasMove := true
  725. for hasMove {
  726. hasMove = false
  727. slices.SortFunc(rackEcNodes, func(a, b *EcNode) int {
  728. return b.freeEcSlot - a.freeEcSlot
  729. })
  730. emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1]
  731. emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id]
  732. if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
  733. emptyNodeIds := make(map[uint32]bool)
  734. if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
  735. for _, shards := range emptyDiskInfo.EcShardInfos {
  736. emptyNodeIds[shards.Id] = true
  737. }
  738. }
  739. if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
  740. for _, shards := range fullDiskInfo.EcShardInfos {
  741. if _, found := emptyNodeIds[shards.Id]; !found {
  742. for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
  743. fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
  744. err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing)
  745. if err != nil {
  746. return err
  747. }
  748. ecNodeIdToShardCount[emptyNode.info.Id]++
  749. ecNodeIdToShardCount[fullNode.info.Id]--
  750. hasMove = true
  751. break
  752. }
  753. break
  754. }
  755. }
  756. }
  757. }
  758. }
  759. return nil
  760. }
  761. func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, error) {
  762. if existingLocation == nil {
  763. return nil, fmt.Errorf("INTERNAL: missing source nodes")
  764. }
  765. if len(possibleDestinations) == 0 {
  766. return nil, fmt.Errorf("INTERNAL: missing destination nodes")
  767. }
  768. nodeShards := map[*EcNode]int{}
  769. for _, node := range possibleDestinations {
  770. nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount()
  771. }
  772. targets := []*EcNode{}
  773. targetShards := -1
  774. for _, shards := range nodeShards {
  775. if shards > targetShards {
  776. targetShards = shards
  777. }
  778. }
  779. details := ""
  780. for _, node := range possibleDestinations {
  781. if node.info.Id == existingLocation.info.Id {
  782. continue
  783. }
  784. if node.freeEcSlot <= 0 {
  785. details += fmt.Sprintf(" Skipped %s because it has no free slots\n", node.info.Id)
  786. continue
  787. }
  788. shards := nodeShards[node]
  789. if ecb.replicaPlacement != nil && shards > ecb.replicaPlacement.SameRackCount+1 {
  790. details += fmt.Sprintf(" Skipped %s because shards %d > replica placement limit for the rack (%d + 1)\n", node.info.Id, shards, ecb.replicaPlacement.SameRackCount)
  791. continue
  792. }
  793. if shards < targetShards {
  794. // Favor nodes with less shards, to ensure an uniform distribution.
  795. targets = nil
  796. targetShards = shards
  797. }
  798. if shards == targetShards {
  799. targets = append(targets, node)
  800. }
  801. }
  802. if len(targets) == 0 {
  803. return nil, errors.New(details)
  804. }
  805. return targets[rand.IntN(len(targets))], nil
  806. }
  807. func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error {
  808. destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
  809. if err != nil {
  810. fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error())
  811. return nil
  812. }
  813. fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
  814. return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing)
  815. }
  816. func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
  817. picked := make(map[erasure_coding.ShardId]*EcNode)
  818. var candidateEcNodes []*CandidateEcNode
  819. for _, ecNode := range ecNodes {
  820. shardBits := findEcVolumeShards(ecNode, vid)
  821. if shardBits.ShardIdCount() > 0 {
  822. candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
  823. ecNode: ecNode,
  824. shardCount: shardBits.ShardIdCount(),
  825. })
  826. }
  827. }
  828. slices.SortFunc(candidateEcNodes, func(a, b *CandidateEcNode) int {
  829. return b.shardCount - a.shardCount
  830. })
  831. for i := 0; i < n; i++ {
  832. selectedEcNodeIndex := -1
  833. for i, candidateEcNode := range candidateEcNodes {
  834. shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
  835. if shardBits > 0 {
  836. selectedEcNodeIndex = i
  837. for _, shardId := range shardBits.ShardIds() {
  838. candidateEcNode.shardCount--
  839. picked[shardId] = candidateEcNode.ecNode
  840. candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
  841. break
  842. }
  843. break
  844. }
  845. }
  846. if selectedEcNodeIndex >= 0 {
  847. ensureSortedEcNodes(candidateEcNodes, selectedEcNodeIndex, func(i, j int) bool {
  848. return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
  849. })
  850. }
  851. }
  852. return picked
  853. }
  854. func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode {
  855. vidLocations := make(map[needle.VolumeId][]*EcNode)
  856. for _, ecNode := range ecb.ecNodes {
  857. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  858. if !found {
  859. continue
  860. }
  861. for _, shardInfo := range diskInfo.EcShardInfos {
  862. // ignore if not in current collection
  863. if shardInfo.Collection == collection {
  864. vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
  865. }
  866. }
  867. }
  868. return vidLocations
  869. }
  870. func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) {
  871. // collect all ec nodes
  872. allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc)
  873. if err != nil {
  874. return err
  875. }
  876. if totalFreeEcSlots < 1 {
  877. return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
  878. }
  879. ecb := &ecBalancer{
  880. commandEnv: commandEnv,
  881. ecNodes: allEcNodes,
  882. replicaPlacement: ecReplicaPlacement,
  883. applyBalancing: applyBalancing,
  884. maxParallelization: maxParallelization,
  885. }
  886. if len(collections) == 0 {
  887. fmt.Printf("WARNING: No collections to balance EC volumes across.\n")
  888. }
  889. for _, c := range collections {
  890. if err = ecb.balanceEcVolumes(c); err != nil {
  891. return err
  892. }
  893. }
  894. if err := ecb.balanceEcRacks(); err != nil {
  895. return fmt.Errorf("balance ec racks: %w", err)
  896. }
  897. return nil
  898. }
  899. // compileCollectionPattern compiles a regex pattern for collection matching.
  900. // Empty patterns match empty collections only.
  901. func compileCollectionPattern(pattern string) (*regexp.Regexp, error) {
  902. if pattern == "" {
  903. // empty pattern matches empty collection
  904. return regexp.Compile("^$")
  905. }
  906. return regexp.Compile(pattern)
  907. }