command_volume_fix_replication.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "path/filepath"
  8. "strconv"
  9. "time"
  10. "slices"
  11. "github.com/seaweedfs/seaweedfs/weed/pb"
  12. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. "google.golang.org/grpc"
  17. "github.com/seaweedfs/seaweedfs/weed/operation"
  18. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  19. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  20. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  21. )
  22. func init() {
  23. Commands = append(Commands, &commandVolumeFixReplication{})
  24. }
  25. type commandVolumeFixReplication struct {
  26. collectionPattern *string
  27. // TODO: move parameter flags here so we don't shuffle them around via function calls.
  28. }
  29. func (c *commandVolumeFixReplication) Name() string {
  30. return "volume.fix.replication"
  31. }
  32. func (c *commandVolumeFixReplication) Help() string {
  33. return `add or remove replicas to volumes that are missing replicas or over-replicated
  34. This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
  35. This command also finds all under-replicated volumes, and finds volume servers with free slots.
  36. If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
  37. volume.fix.replication -n # do not take action
  38. volume.fix.replication # actually deleting or copying the volume files and mount the volume
  39. volume.fix.replication -collectionPattern=important* # fix any collections with prefix "important"
  40. Note:
  41. * each time this will only add back one replica for each volume id that is under replicated.
  42. If there are multiple replicas are missing, e.g. replica count is > 2, you may need to run this multiple times.
  43. * do not run this too quickly within seconds, since the new volume replica may take a few seconds
  44. to register itself to the master.
  45. `
  46. }
  47. func (c *commandVolumeFixReplication) HasTag(tag CommandTag) bool {
  48. return false && tag == ResourceHeavy // resource intensive only when deleting and checking with replicas.
  49. }
  50. func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  51. volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  52. c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
  53. applyChanges := volFixReplicationCommand.Bool("force", false, "apply the fix")
  54. doDelete := volFixReplicationCommand.Bool("doDelete", true, "Also delete over-replicated volumes besides fixing under-replication")
  55. doCheck := volFixReplicationCommand.Bool("doCheck", true, "Also check synchronization before deleting")
  56. maxParallelization := volFixReplicationCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
  57. retryCount := volFixReplicationCommand.Int("retry", 5, "how many times to retry")
  58. volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle")
  59. if err = volFixReplicationCommand.Parse(args); err != nil {
  60. return nil
  61. }
  62. infoAboutSimulationMode(writer, *applyChanges, "-force")
  63. commandEnv.noLock = !*applyChanges
  64. if err = commandEnv.confirmIsLocked(args); *applyChanges && err != nil {
  65. return
  66. }
  67. ewg := NewErrorWaitGroup(*maxParallelization)
  68. underReplicatedVolumeIdsCount := 1
  69. for underReplicatedVolumeIdsCount > 0 {
  70. fixedVolumeReplicas := map[string]int{}
  71. // collect topology information
  72. topologyInfo, _, err := collectTopologyInfo(commandEnv, 15*time.Second)
  73. if err != nil {
  74. return err
  75. }
  76. // find all volumes that needs replication
  77. // collect all data nodes
  78. volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
  79. if len(allLocations) == 0 {
  80. return fmt.Errorf("no data nodes at all")
  81. }
  82. // find all under replicated volumes
  83. var underReplicatedVolumeIds, overReplicatedVolumeIds, misplacedVolumeIds []uint32
  84. for vid, replicas := range volumeReplicas {
  85. replica := replicas[0]
  86. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  87. switch {
  88. case replicaPlacement.GetCopyCount() > len(replicas) || !satisfyReplicaCurrentLocation(replicaPlacement, replicas):
  89. underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
  90. fmt.Fprintf(writer, "volume %d replication %s, but under replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
  91. case isMisplaced(replicas, replicaPlacement):
  92. misplacedVolumeIds = append(misplacedVolumeIds, vid)
  93. fmt.Fprintf(writer, "volume %d replication %s is not well placed %s\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id)
  94. case replicaPlacement.GetCopyCount() < len(replicas):
  95. overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
  96. fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
  97. }
  98. }
  99. underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds)
  100. if !commandEnv.isLocked() {
  101. return fmt.Errorf("lock is lost")
  102. }
  103. ewg.Reset()
  104. ewg.Add(func() error {
  105. // find the most underpopulated data nodes
  106. fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, *applyChanges, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep)
  107. return err
  108. })
  109. if *doDelete {
  110. ewg.Add(func() error {
  111. return c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete)
  112. })
  113. ewg.Add(func() error {
  114. return c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume)
  115. })
  116. }
  117. if err := ewg.Wait(); err != nil {
  118. return nil
  119. }
  120. if !*applyChanges {
  121. break
  122. }
  123. // check that the topology has been updated
  124. if len(fixedVolumeReplicas) > 0 {
  125. fixedVolumes := make([]string, 0, len(fixedVolumeReplicas))
  126. for k, _ := range fixedVolumeReplicas {
  127. fixedVolumes = append(fixedVolumes, k)
  128. }
  129. volumeIdLocations, err := lookupVolumeIds(commandEnv, fixedVolumes)
  130. if err != nil {
  131. return err
  132. }
  133. for _, volumeIdLocation := range volumeIdLocations {
  134. volumeId := volumeIdLocation.VolumeOrFileId
  135. volumeIdLocationCount := len(volumeIdLocation.Locations)
  136. i := 0
  137. for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount {
  138. fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId)
  139. time.Sleep(time.Duration(i+1) * time.Second * 7)
  140. volumeLocIds, err := lookupVolumeIds(commandEnv, []string{volumeId})
  141. if err != nil {
  142. return err
  143. }
  144. volumeIdLocationCount = len(volumeLocIds[0].Locations)
  145. if *retryCount <= i {
  146. return fmt.Errorf("replicas volume %s mismatch in topology", volumeId)
  147. }
  148. i += 1
  149. }
  150. }
  151. }
  152. }
  153. return nil
  154. }
  155. func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) {
  156. volumeReplicas := make(map[uint32][]*VolumeReplica)
  157. var allLocations []location
  158. eachDataNode(topologyInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
  159. loc := newLocation(string(dc), string(rack), dn)
  160. for _, diskInfo := range dn.DiskInfos {
  161. for _, v := range diskInfo.VolumeInfos {
  162. volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
  163. location: &loc,
  164. info: v,
  165. })
  166. }
  167. }
  168. allLocations = append(allLocations, loc)
  169. })
  170. return volumeReplicas, allLocations
  171. }
  172. type SelectOneVolumeFunc func(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica
  173. func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, grpcDialOption grpc.DialOption) (err error) {
  174. aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb()
  175. defer func() {
  176. aDB.Close()
  177. bDB.Close()
  178. }()
  179. // read index db
  180. readIndexDbCutoffFrom := uint64(time.Now().UnixNano())
  181. if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), false, writer, grpcDialOption); err != nil {
  182. return fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err)
  183. }
  184. if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), false, writer, grpcDialOption); err != nil {
  185. return fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err)
  186. }
  187. if _, err = doVolumeCheckDisk(aDB, bDB, a, b, false, writer, true, false, float64(1), readIndexDbCutoffFrom, grpcDialOption); err != nil {
  188. return fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err)
  189. }
  190. return
  191. }
  192. func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, doCheck bool, volumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error {
  193. if len(volumeIds) == 0 {
  194. // nothing to do
  195. return nil
  196. }
  197. for _, vid := range volumeIds {
  198. replicas := volumeReplicas[vid]
  199. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
  200. replica := selectOneVolumeFn(replicas, replicaPlacement)
  201. // check collection name pattern
  202. if *c.collectionPattern != "" {
  203. matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
  204. if err != nil {
  205. return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
  206. }
  207. if !matched {
  208. continue
  209. }
  210. }
  211. collectionIsMismatch := false
  212. for _, volumeReplica := range replicas {
  213. if volumeReplica.info.Collection != replica.info.Collection {
  214. fmt.Fprintf(writer, "skip delete volume %d as collection %s is mismatch: %s\n", replica.info.Id, replica.info.Collection, volumeReplica.info.Collection)
  215. collectionIsMismatch = true
  216. }
  217. }
  218. if collectionIsMismatch {
  219. continue
  220. }
  221. fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
  222. if !applyChanges {
  223. break
  224. }
  225. if doCheck {
  226. var checkErr error
  227. for _, replicaB := range replicas {
  228. if replicaB.location.dataNode == replica.location.dataNode {
  229. continue
  230. }
  231. if checkErr = checkOneVolume(replica, replicaB, writer, commandEnv.option.GrpcDialOption); checkErr != nil {
  232. fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", replica.info.Id, replica.location.dataNode.Id, replicaB.location.dataNode.Id, checkErr)
  233. break
  234. }
  235. }
  236. if checkErr != nil {
  237. continue
  238. }
  239. }
  240. if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id),
  241. pb.NewServerAddressFromDataNode(replica.location.dataNode), false); err != nil {
  242. fmt.Fprintf(writer, "deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
  243. }
  244. }
  245. return nil
  246. }
  247. func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, volumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) {
  248. fixedVolumes = map[string]int{}
  249. if len(volumeIds) == 0 {
  250. return fixedVolumes, nil
  251. }
  252. if len(volumeIds) > volumesPerStep && volumesPerStep > 0 {
  253. volumeIds = volumeIds[0:volumesPerStep]
  254. }
  255. for _, vid := range volumeIds {
  256. for i := 0; i < retryCount+1; i++ {
  257. if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, applyChanges, volumeReplicas, vid, allLocations); err == nil {
  258. if applyChanges {
  259. fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid])
  260. }
  261. break
  262. } else {
  263. fmt.Fprintf(writer, "fixing under replicated volume %d: %v\n", vid, err)
  264. }
  265. }
  266. }
  267. return fixedVolumes, nil
  268. }
  269. func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error {
  270. replicas := volumeReplicas[vid]
  271. replica := pickOneReplicaToCopyFrom(replicas)
  272. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  273. foundNewLocation := false
  274. hasSkippedCollection := false
  275. keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
  276. fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
  277. for _, dst := range allLocations {
  278. // check whether data nodes satisfy the constraints
  279. if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
  280. // check collection name pattern
  281. if *c.collectionPattern != "" {
  282. matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
  283. if err != nil {
  284. return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
  285. }
  286. if !matched {
  287. hasSkippedCollection = true
  288. break
  289. }
  290. }
  291. // ask the volume server to replicate the volume
  292. foundNewLocation = true
  293. fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
  294. if !applyChanges {
  295. // adjust volume count
  296. addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], 1)
  297. break
  298. }
  299. err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  300. stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
  301. VolumeId: replica.info.Id,
  302. SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)),
  303. })
  304. if replicateErr != nil {
  305. return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
  306. }
  307. for {
  308. resp, recvErr := stream.Recv()
  309. if recvErr != nil {
  310. if recvErr == io.EOF {
  311. break
  312. } else {
  313. return recvErr
  314. }
  315. }
  316. if resp.ProcessedBytes > 0 {
  317. fmt.Fprintf(writer, "volume %d processed %s bytes\n", replica.info.Id, util.BytesToHumanReadable(uint64(resp.ProcessedBytes)))
  318. }
  319. }
  320. return nil
  321. })
  322. if err != nil {
  323. return err
  324. }
  325. // adjust volume count
  326. addVolumeCount(dst.dataNode.DiskInfos[replica.info.DiskType], 1)
  327. break
  328. }
  329. }
  330. if !foundNewLocation && !hasSkippedCollection {
  331. fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
  332. }
  333. return nil
  334. }
  335. func addVolumeCount(info *master_pb.DiskInfo, count int) {
  336. if info == nil {
  337. return
  338. }
  339. info.VolumeCount += int64(count)
  340. info.FreeVolumeCount -= int64(count)
  341. }
  342. func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) {
  343. fn := capacityByFreeVolumeCount(diskType)
  344. slices.SortFunc(dataNodes, func(a, b location) int {
  345. return int(fn(b.dataNode) - fn(a.dataNode))
  346. })
  347. }
  348. func satisfyReplicaCurrentLocation(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica) bool {
  349. existingDataCenters, existingRacks, _ := countReplicas(replicas)
  350. if replicaPlacement.DiffDataCenterCount+1 > len(existingDataCenters) {
  351. return false
  352. }
  353. if replicaPlacement.DiffRackCount+1 > len(existingRacks) {
  354. return false
  355. }
  356. if replicaPlacement.SameRackCount > 0 {
  357. foundSatisfyRack := false
  358. for _, rackCount := range existingRacks {
  359. if rackCount >= replicaPlacement.SameRackCount+1 {
  360. foundSatisfyRack = true
  361. }
  362. }
  363. return foundSatisfyRack
  364. }
  365. return true
  366. }
  367. /*
  368. if on an existing data node {
  369. return false
  370. }
  371. if different from existing dcs {
  372. if lack on different dcs {
  373. return true
  374. }else{
  375. return false
  376. }
  377. }
  378. if not on primary dc {
  379. return false
  380. }
  381. if different from existing racks {
  382. if lack on different racks {
  383. return true
  384. }else{
  385. return false
  386. }
  387. }
  388. if not on primary rack {
  389. return false
  390. }
  391. if lacks on same rack {
  392. return true
  393. } else {
  394. return false
  395. }
  396. */
  397. func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
  398. existingDataCenters, _, existingDataNodes := countReplicas(replicas)
  399. if _, found := existingDataNodes[possibleLocation.String()]; found {
  400. // avoid duplicated volume on the same data node
  401. return false
  402. }
  403. primaryDataCenters, _ := findTopKeys(existingDataCenters)
  404. // ensure data center count is within limit
  405. if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found {
  406. // different from existing dcs
  407. if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 {
  408. // lack on different dcs
  409. return true
  410. } else {
  411. // adding this would go over the different dcs limit
  412. return false
  413. }
  414. }
  415. // now this is same as one of the existing data center
  416. if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) {
  417. // not on one of the primary dcs
  418. return false
  419. }
  420. // now this is one of the primary dcs
  421. primaryDcRacks := make(map[string]int)
  422. for _, replica := range replicas {
  423. if replica.location.DataCenter() != possibleLocation.DataCenter() {
  424. continue
  425. }
  426. primaryDcRacks[replica.location.Rack()] += 1
  427. }
  428. primaryRacks, _ := findTopKeys(primaryDcRacks)
  429. sameRackCount := primaryDcRacks[possibleLocation.Rack()]
  430. // ensure rack count is within limit
  431. if _, found := primaryDcRacks[possibleLocation.Rack()]; !found {
  432. // different from existing racks
  433. if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 {
  434. // lack on different racks
  435. return true
  436. } else {
  437. // adding this would go over the different racks limit
  438. return false
  439. }
  440. }
  441. // now this is same as one of the existing racks
  442. if !isAmong(possibleLocation.Rack(), primaryRacks) {
  443. // not on the primary rack
  444. return false
  445. }
  446. // now this is on the primary rack
  447. // different from existing data nodes
  448. if sameRackCount < replicaPlacement.SameRackCount+1 {
  449. // lack on same rack
  450. return true
  451. } else {
  452. // adding this would go over the same data node limit
  453. return false
  454. }
  455. }
  456. func findTopKeys(m map[string]int) (topKeys []string, max int) {
  457. for k, c := range m {
  458. if max < c {
  459. topKeys = topKeys[:0]
  460. topKeys = append(topKeys, k)
  461. max = c
  462. } else if max == c {
  463. topKeys = append(topKeys, k)
  464. }
  465. }
  466. return
  467. }
  468. func isAmong(key string, keys []string) bool {
  469. for _, k := range keys {
  470. if k == key {
  471. return true
  472. }
  473. }
  474. return false
  475. }
  476. type VolumeReplica struct {
  477. location *location
  478. info *master_pb.VolumeInformationMessage
  479. }
  480. type location struct {
  481. dc string
  482. rack string
  483. dataNode *master_pb.DataNodeInfo
  484. }
  485. func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
  486. return location{
  487. dc: dc,
  488. rack: rack,
  489. dataNode: dataNode,
  490. }
  491. }
  492. func (l location) String() string {
  493. return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
  494. }
  495. func (l location) Rack() string {
  496. return fmt.Sprintf("%s %s", l.dc, l.rack)
  497. }
  498. func (l location) DataCenter() string {
  499. return l.dc
  500. }
  501. func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
  502. mostRecent := replicas[0]
  503. for _, replica := range replicas {
  504. if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
  505. mostRecent = replica
  506. }
  507. }
  508. return mostRecent
  509. }
  510. func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
  511. diffDc = make(map[string]int)
  512. diffRack = make(map[string]int)
  513. diffNode = make(map[string]int)
  514. for _, replica := range replicas {
  515. diffDc[replica.location.DataCenter()] += 1
  516. diffRack[replica.location.Rack()] += 1
  517. diffNode[replica.location.String()] += 1
  518. }
  519. return
  520. }
  521. func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
  522. slices.SortFunc(replicas, func(a, b *VolumeReplica) int {
  523. if a.info.Size != b.info.Size {
  524. return int(a.info.Size - b.info.Size)
  525. }
  526. if a.info.ModifiedAtSecond != b.info.ModifiedAtSecond {
  527. return int(a.info.ModifiedAtSecond - b.info.ModifiedAtSecond)
  528. }
  529. if a.info.CompactRevision != b.info.CompactRevision {
  530. return int(a.info.CompactRevision - b.info.CompactRevision)
  531. }
  532. return 0
  533. })
  534. return replicas[0]
  535. }
  536. // check and fix misplaced volumes
  537. func isMisplaced(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) bool {
  538. for i := 0; i < len(replicas); i++ {
  539. others := otherThan(replicas, i)
  540. if !satisfyReplicaPlacement(replicaPlacement, others, *replicas[i].location) {
  541. return true
  542. }
  543. }
  544. return false
  545. }
  546. func otherThan(replicas []*VolumeReplica, index int) (others []*VolumeReplica) {
  547. for i := 0; i < len(replicas); i++ {
  548. if index != i {
  549. others = append(others, replicas[i])
  550. }
  551. }
  552. return
  553. }
  554. func pickOneMisplacedVolume(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) (toDelete *VolumeReplica) {
  555. var deletionCandidates []*VolumeReplica
  556. for i := 0; i < len(replicas); i++ {
  557. others := otherThan(replicas, i)
  558. if !isMisplaced(others, replicaPlacement) {
  559. deletionCandidates = append(deletionCandidates, replicas[i])
  560. }
  561. }
  562. if len(deletionCandidates) > 0 {
  563. return pickOneReplicaToDelete(deletionCandidates, replicaPlacement)
  564. }
  565. return pickOneReplicaToDelete(replicas, replicaPlacement)
  566. }