lock_ring.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package lock_manager
  2. import (
  3. "sort"
  4. "sync"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. )
  10. type LockRingSnapshot struct {
  11. servers []pb.ServerAddress
  12. ts time.Time
  13. }
  14. type LockRing struct {
  15. sync.RWMutex
  16. snapshots []*LockRingSnapshot
  17. candidateServers map[pb.ServerAddress]struct{}
  18. lastUpdateTime time.Time
  19. lastCompactTime time.Time
  20. snapshotInterval time.Duration
  21. onTakeSnapshot func(snapshot []pb.ServerAddress)
  22. cleanupWg sync.WaitGroup
  23. }
  24. func NewLockRing(snapshotInterval time.Duration) *LockRing {
  25. return &LockRing{
  26. snapshotInterval: snapshotInterval,
  27. candidateServers: make(map[pb.ServerAddress]struct{}),
  28. }
  29. }
  30. func (r *LockRing) SetTakeSnapshotCallback(onTakeSnapshot func(snapshot []pb.ServerAddress)) {
  31. r.Lock()
  32. defer r.Unlock()
  33. r.onTakeSnapshot = onTakeSnapshot
  34. }
  35. // AddServer adds a server to the ring
  36. // if the previous snapshot passed the snapshot interval, create a new snapshot
  37. func (r *LockRing) AddServer(server pb.ServerAddress) {
  38. glog.V(0).Infof("add server %v", server)
  39. r.Lock()
  40. if _, found := r.candidateServers[server]; found {
  41. glog.V(0).Infof("add server: already exists %v", server)
  42. r.Unlock()
  43. return
  44. }
  45. r.lastUpdateTime = time.Now()
  46. r.candidateServers[server] = struct{}{}
  47. r.Unlock()
  48. r.takeSnapshotWithDelayedCompaction()
  49. }
  50. func (r *LockRing) RemoveServer(server pb.ServerAddress) {
  51. glog.V(0).Infof("remove server %v", server)
  52. r.Lock()
  53. if _, found := r.candidateServers[server]; !found {
  54. r.Unlock()
  55. return
  56. }
  57. r.lastUpdateTime = time.Now()
  58. delete(r.candidateServers, server)
  59. r.Unlock()
  60. r.takeSnapshotWithDelayedCompaction()
  61. }
  62. func (r *LockRing) SetSnapshot(servers []pb.ServerAddress) {
  63. sort.Slice(servers, func(i, j int) bool {
  64. return servers[i] < servers[j]
  65. })
  66. r.Lock()
  67. r.lastUpdateTime = time.Now()
  68. // init candidateServers
  69. for _, server := range servers {
  70. r.candidateServers[server] = struct{}{}
  71. }
  72. r.Unlock()
  73. r.addOneSnapshot(servers)
  74. r.cleanupWg.Add(1)
  75. go func() {
  76. defer r.cleanupWg.Done()
  77. <-time.After(r.snapshotInterval)
  78. r.compactSnapshots()
  79. }()
  80. }
  81. func (r *LockRing) takeSnapshotWithDelayedCompaction() {
  82. r.doTakeSnapshot()
  83. r.cleanupWg.Add(1)
  84. go func() {
  85. defer r.cleanupWg.Done()
  86. <-time.After(r.snapshotInterval)
  87. r.compactSnapshots()
  88. }()
  89. }
  90. func (r *LockRing) doTakeSnapshot() {
  91. servers := r.getSortedServers()
  92. r.addOneSnapshot(servers)
  93. }
  94. func (r *LockRing) addOneSnapshot(servers []pb.ServerAddress) {
  95. r.Lock()
  96. defer r.Unlock()
  97. ts := time.Now()
  98. t := &LockRingSnapshot{
  99. servers: servers,
  100. ts: ts,
  101. }
  102. r.snapshots = append(r.snapshots, t)
  103. for i := len(r.snapshots) - 2; i >= 0; i-- {
  104. r.snapshots[i+1] = r.snapshots[i]
  105. }
  106. r.snapshots[0] = t
  107. if r.onTakeSnapshot != nil {
  108. r.onTakeSnapshot(t.servers)
  109. }
  110. }
  111. func (r *LockRing) compactSnapshots() {
  112. r.Lock()
  113. defer r.Unlock()
  114. // Always attempt compaction when called, regardless of lastCompactTime
  115. // This ensures proper cleanup even with multiple concurrent compaction requests
  116. ts := time.Now()
  117. // remove old snapshots
  118. recentSnapshotIndex := 1
  119. for ; recentSnapshotIndex < len(r.snapshots); recentSnapshotIndex++ {
  120. if ts.Sub(r.snapshots[recentSnapshotIndex].ts) > r.snapshotInterval {
  121. break
  122. }
  123. }
  124. // keep the one that has been running for a while
  125. if recentSnapshotIndex+1 <= len(r.snapshots) {
  126. r.snapshots = r.snapshots[:recentSnapshotIndex+1]
  127. }
  128. r.lastCompactTime = ts
  129. }
  130. func (r *LockRing) getSortedServers() []pb.ServerAddress {
  131. sortedServers := make([]pb.ServerAddress, 0, len(r.candidateServers))
  132. for server := range r.candidateServers {
  133. sortedServers = append(sortedServers, server)
  134. }
  135. sort.Slice(sortedServers, func(i, j int) bool {
  136. return sortedServers[i] < sortedServers[j]
  137. })
  138. return sortedServers
  139. }
  140. func (r *LockRing) GetSnapshot() (servers []pb.ServerAddress) {
  141. r.RLock()
  142. defer r.RUnlock()
  143. if len(r.snapshots) == 0 {
  144. return
  145. }
  146. return r.snapshots[0].servers
  147. }
  148. // WaitForCleanup waits for all pending cleanup operations to complete
  149. // This is useful for testing to ensure deterministic behavior
  150. func (r *LockRing) WaitForCleanup() {
  151. r.cleanupWg.Wait()
  152. }
  153. // GetSnapshotCount safely returns the number of snapshots for testing
  154. func (r *LockRing) GetSnapshotCount() int {
  155. r.RLock()
  156. defer r.RUnlock()
  157. return len(r.snapshots)
  158. }
  159. func hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
  160. if len(servers) == 0 {
  161. return ""
  162. }
  163. x := util.HashStringToLong(key)
  164. if x < 0 {
  165. x = -x
  166. }
  167. x = x % int64(len(servers))
  168. return servers[x]
  169. }