volume_grpc_client_to_master.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. package weed_server
  2. import (
  3. "fmt"
  4. "os"
  5. "strings"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/operation"
  8. "google.golang.org/grpc"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/security"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/backend"
  12. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  13. "golang.org/x/net/context"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/util"
  17. )
  18. func (vs *VolumeServer) GetMaster(ctx context.Context) pb.ServerAddress {
  19. return vs.currentMaster
  20. }
  21. func (vs *VolumeServer) checkWithMaster() (err error) {
  22. for {
  23. for _, master := range vs.SeedMasterNodes {
  24. err = operation.WithMasterServerClient(false, master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  25. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  26. if err != nil {
  27. return fmt.Errorf("get master %s configuration: %v", master, err)
  28. }
  29. vs.metricsAddress, vs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  30. backend.LoadFromPbStorageBackends(resp.StorageBackends)
  31. return nil
  32. })
  33. if err == nil {
  34. return
  35. } else {
  36. glog.V(0).Infof("checkWithMaster %s: %v", master, err)
  37. }
  38. }
  39. time.Sleep(1790 * time.Millisecond)
  40. }
  41. }
  42. func (vs *VolumeServer) heartbeat() {
  43. glog.V(0).Infof("Volume server start with seed master nodes: %v", vs.SeedMasterNodes)
  44. vs.store.SetDataCenter(vs.dataCenter)
  45. vs.store.SetRack(vs.rack)
  46. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.volume")
  47. var err error
  48. var newLeader pb.ServerAddress
  49. duplicateRetryCount := 0
  50. for vs.isHeartbeating {
  51. for _, master := range vs.SeedMasterNodes {
  52. if newLeader != "" {
  53. // the new leader may actually is the same master
  54. // need to wait a bit before adding itself
  55. time.Sleep(3 * time.Second)
  56. master = newLeader
  57. }
  58. vs.store.MasterAddress = master
  59. newLeader, err = vs.doHeartbeatWithRetry(master, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second, duplicateRetryCount)
  60. if err != nil {
  61. glog.V(0).Infof("heartbeat to %s error: %v", master, err)
  62. // Check if this is a duplicate UUID retry error
  63. if strings.Contains(err.Error(), "duplicate UUIDs detected, retrying connection") {
  64. duplicateRetryCount++
  65. retryDelay := time.Duration(1<<(duplicateRetryCount-1)) * 2 * time.Second // exponential backoff: 2s, 4s, 8s
  66. glog.V(0).Infof("Waiting %v before retrying due to duplicate UUID detection...", retryDelay)
  67. time.Sleep(retryDelay)
  68. } else {
  69. // Regular error, reset duplicate retry count
  70. duplicateRetryCount = 0
  71. time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
  72. }
  73. newLeader = ""
  74. vs.store.MasterAddress = ""
  75. } else {
  76. // Successful connection, reset retry count
  77. duplicateRetryCount = 0
  78. }
  79. if !vs.isHeartbeating {
  80. break
  81. }
  82. }
  83. }
  84. }
  85. func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) {
  86. if !vs.isHeartbeating {
  87. return true
  88. }
  89. vs.isHeartbeating = false
  90. close(vs.stopChan)
  91. return false
  92. }
  93. func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader pb.ServerAddress, err error) {
  94. return vs.doHeartbeatWithRetry(masterAddress, grpcDialOption, sleepInterval, 0)
  95. }
  96. func (vs *VolumeServer) doHeartbeatWithRetry(masterAddress pb.ServerAddress, grpcDialOption grpc.DialOption, sleepInterval time.Duration, duplicateRetryCount int) (newLeader pb.ServerAddress, err error) {
  97. ctx, cancel := context.WithCancel(context.Background())
  98. defer cancel()
  99. grpcConnection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), false, grpcDialOption)
  100. if err != nil {
  101. return "", fmt.Errorf("fail to dial %s : %v", masterAddress, err)
  102. }
  103. defer grpcConnection.Close()
  104. client := master_pb.NewSeaweedClient(grpcConnection)
  105. stream, err := client.SendHeartbeat(ctx)
  106. if err != nil {
  107. glog.V(0).Infof("SendHeartbeat to %s: %v", masterAddress, err)
  108. return "", err
  109. }
  110. glog.V(0).Infof("Heartbeat to: %v", masterAddress)
  111. vs.currentMaster = masterAddress
  112. doneChan := make(chan error, 1)
  113. go func() {
  114. for {
  115. in, err := stream.Recv()
  116. if err != nil {
  117. doneChan <- err
  118. return
  119. }
  120. if len(in.DuplicatedUuids) > 0 {
  121. var duplicateDir []string
  122. for _, loc := range vs.store.Locations {
  123. for _, uuid := range in.DuplicatedUuids {
  124. if uuid == loc.DirectoryUuid {
  125. duplicateDir = append(duplicateDir, loc.Directory)
  126. }
  127. }
  128. }
  129. // Implement retry logic for potential race conditions
  130. const maxRetries = 3
  131. if duplicateRetryCount < maxRetries {
  132. retryDelay := time.Duration(1<<duplicateRetryCount) * 2 * time.Second // exponential backoff: 2s, 4s, 8s
  133. glog.Errorf("Master reported duplicate volume directories: %v (retry %d/%d)", duplicateDir, duplicateRetryCount+1, maxRetries)
  134. glog.Errorf("This might be due to a race condition during reconnection. Waiting %v before retrying...", retryDelay)
  135. // Return error to trigger retry with increased count
  136. doneChan <- fmt.Errorf("duplicate UUIDs detected, retrying connection (attempt %d/%d)", duplicateRetryCount+1, maxRetries)
  137. return
  138. } else {
  139. // After max retries, this is likely a real duplicate
  140. glog.Errorf("Shut down Volume Server due to persistent duplicate volume directories after %d retries: %v", maxRetries, duplicateDir)
  141. glog.Errorf("Please check if another volume server is using the same directory")
  142. os.Exit(1)
  143. }
  144. }
  145. volumeOptsChanged := false
  146. if vs.store.GetPreallocate() != in.GetPreallocate() {
  147. vs.store.SetPreallocate(in.GetPreallocate())
  148. volumeOptsChanged = true
  149. }
  150. if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() {
  151. vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
  152. volumeOptsChanged = true
  153. }
  154. if volumeOptsChanged {
  155. if vs.store.MaybeAdjustVolumeMax() {
  156. if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
  157. glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
  158. return
  159. }
  160. }
  161. }
  162. if in.GetLeader() != "" && string(vs.currentMaster) != in.GetLeader() {
  163. glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
  164. newLeader = pb.ServerAddress(in.GetLeader())
  165. doneChan <- nil
  166. return
  167. }
  168. }
  169. }()
  170. if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
  171. glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
  172. return "", err
  173. }
  174. if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
  175. glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
  176. return "", err
  177. }
  178. volumeTickChan := time.NewTicker(sleepInterval)
  179. defer volumeTickChan.Stop()
  180. ecShardTickChan := time.NewTicker(17 * sleepInterval)
  181. defer ecShardTickChan.Stop()
  182. dataCenter := vs.store.GetDataCenter()
  183. rack := vs.store.GetRack()
  184. ip := vs.store.Ip
  185. port := uint32(vs.store.Port)
  186. for {
  187. select {
  188. case volumeMessage := <-vs.store.NewVolumesChan:
  189. deltaBeat := &master_pb.Heartbeat{
  190. Ip: ip,
  191. Port: port,
  192. DataCenter: dataCenter,
  193. Rack: rack,
  194. NewVolumes: []*master_pb.VolumeShortInformationMessage{
  195. &volumeMessage,
  196. },
  197. }
  198. glog.V(0).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
  199. if err = stream.Send(deltaBeat); err != nil {
  200. glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
  201. return "", err
  202. }
  203. case ecShardMessage := <-vs.store.NewEcShardsChan:
  204. deltaBeat := &master_pb.Heartbeat{
  205. Ip: ip,
  206. Port: port,
  207. DataCenter: dataCenter,
  208. Rack: rack,
  209. NewEcShards: []*master_pb.VolumeEcShardInformationMessage{
  210. &ecShardMessage,
  211. },
  212. }
  213. glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
  214. erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
  215. if err = stream.Send(deltaBeat); err != nil {
  216. glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
  217. return "", err
  218. }
  219. case volumeMessage := <-vs.store.DeletedVolumesChan:
  220. deltaBeat := &master_pb.Heartbeat{
  221. Ip: ip,
  222. Port: port,
  223. DataCenter: dataCenter,
  224. Rack: rack,
  225. DeletedVolumes: []*master_pb.VolumeShortInformationMessage{
  226. &volumeMessage,
  227. },
  228. }
  229. glog.V(0).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
  230. if err = stream.Send(deltaBeat); err != nil {
  231. glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
  232. return "", err
  233. }
  234. case ecShardMessage := <-vs.store.DeletedEcShardsChan:
  235. deltaBeat := &master_pb.Heartbeat{
  236. Ip: ip,
  237. Port: port,
  238. DataCenter: dataCenter,
  239. Rack: rack,
  240. DeletedEcShards: []*master_pb.VolumeEcShardInformationMessage{
  241. &ecShardMessage,
  242. },
  243. }
  244. glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
  245. erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
  246. if err = stream.Send(deltaBeat); err != nil {
  247. glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
  248. return "", err
  249. }
  250. case <-volumeTickChan.C:
  251. glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
  252. vs.store.MaybeAdjustVolumeMax()
  253. if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
  254. glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
  255. return "", err
  256. }
  257. case <-ecShardTickChan.C:
  258. glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
  259. if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
  260. glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
  261. return "", err
  262. }
  263. case err = <-doneChan:
  264. return
  265. case <-vs.stopChan:
  266. var volumeMessages []*master_pb.VolumeInformationMessage
  267. emptyBeat := &master_pb.Heartbeat{
  268. Ip: ip,
  269. Port: port,
  270. PublicUrl: vs.store.PublicUrl,
  271. MaxFileKey: uint64(0),
  272. DataCenter: dataCenter,
  273. Rack: rack,
  274. Volumes: volumeMessages,
  275. HasNoVolumes: len(volumeMessages) == 0,
  276. }
  277. glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port)
  278. if err = stream.Send(emptyBeat); err != nil {
  279. glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
  280. return "", err
  281. }
  282. return
  283. }
  284. }
  285. }