broker_grpc_sub.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package broker
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
  10. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  11. "github.com/seaweedfs/seaweedfs/weed/pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  16. )
  17. func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error {
  18. req, err := stream.Recv()
  19. if err != nil {
  20. return err
  21. }
  22. if req.GetInit() == nil {
  23. glog.Errorf("missing init message")
  24. return fmt.Errorf("missing init message")
  25. }
  26. ctx := stream.Context()
  27. clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
  28. t := topic.FromPbTopic(req.GetInit().Topic)
  29. partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
  30. glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
  31. localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition)
  32. if getOrGenErr != nil {
  33. return getOrGenErr
  34. }
  35. subscriber := topic.NewLocalSubscriber()
  36. localTopicPartition.Subscribers.AddSubscriber(clientName, subscriber)
  37. glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
  38. isConnected := true
  39. sleepIntervalCount := 0
  40. var counter int64
  41. defer func() {
  42. isConnected = false
  43. localTopicPartition.Subscribers.RemoveSubscriber(clientName)
  44. glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
  45. if localTopicPartition.MaybeShutdownLocalPartition() {
  46. b.localTopicManager.RemoveLocalPartition(t, partition)
  47. }
  48. }()
  49. startPosition := b.getRequestPosition(req.GetInit())
  50. imt := sub_coordinator.NewInflightMessageTracker(int(req.GetInit().SlidingWindowSize))
  51. // connect to the follower
  52. var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient
  53. glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker)
  54. if req.GetInit().FollowerBroker != "" {
  55. follower := req.GetInit().FollowerBroker
  56. if followerGrpcConnection, err := pb.GrpcDial(ctx, follower, true, b.grpcDialOption); err != nil {
  57. return fmt.Errorf("fail to dial %s: %v", follower, err)
  58. } else {
  59. defer func() {
  60. println("closing SubscribeFollowMe connection", follower)
  61. if subscribeFollowMeStream != nil {
  62. subscribeFollowMeStream.CloseSend()
  63. }
  64. // followerGrpcConnection.Close()
  65. }()
  66. followerClient := mq_pb.NewSeaweedMessagingClient(followerGrpcConnection)
  67. if subscribeFollowMeStream, err = followerClient.SubscribeFollowMe(ctx); err != nil {
  68. return fmt.Errorf("fail to subscribe to %s: %v", follower, err)
  69. } else {
  70. if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
  71. Message: &mq_pb.SubscribeFollowMeRequest_Init{
  72. Init: &mq_pb.SubscribeFollowMeRequest_InitMessage{
  73. Topic: req.GetInit().Topic,
  74. Partition: req.GetInit().GetPartitionOffset().Partition,
  75. ConsumerGroup: req.GetInit().ConsumerGroup,
  76. },
  77. },
  78. }); err != nil {
  79. return fmt.Errorf("fail to send init to %s: %v", follower, err)
  80. }
  81. }
  82. }
  83. glog.V(0).Infof("follower %s connected", follower)
  84. }
  85. go func() {
  86. var lastOffset int64
  87. for {
  88. ack, err := stream.Recv()
  89. if err != nil {
  90. if err == io.EOF {
  91. // the client has called CloseSend(). This is to ack the close.
  92. stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{
  93. Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{
  94. IsEndOfStream: true,
  95. },
  96. }})
  97. break
  98. }
  99. glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err)
  100. break
  101. }
  102. if ack.GetAck().Key == nil {
  103. // skip ack for control messages
  104. continue
  105. }
  106. imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence)
  107. currentLastOffset := imt.GetOldestAckedTimestamp()
  108. // Update acknowledged offset and last seen time for this subscriber when it sends an ack
  109. subscriber.UpdateAckedOffset(currentLastOffset)
  110. // fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset)
  111. if subscribeFollowMeStream != nil && currentLastOffset > lastOffset {
  112. if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
  113. Message: &mq_pb.SubscribeFollowMeRequest_Ack{
  114. Ack: &mq_pb.SubscribeFollowMeRequest_AckMessage{
  115. TsNs: currentLastOffset,
  116. },
  117. },
  118. }); err != nil {
  119. glog.Errorf("Error sending ack to follower: %v", err)
  120. break
  121. }
  122. lastOffset = currentLastOffset
  123. // fmt.Printf("%+v forwarding ack %d\n", partition, lastOffset)
  124. }
  125. }
  126. if lastOffset > 0 {
  127. glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset)
  128. if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil {
  129. glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err)
  130. }
  131. }
  132. if subscribeFollowMeStream != nil {
  133. if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
  134. Message: &mq_pb.SubscribeFollowMeRequest_Close{
  135. Close: &mq_pb.SubscribeFollowMeRequest_CloseMessage{},
  136. },
  137. }); err != nil {
  138. if err != io.EOF {
  139. glog.Errorf("Error sending close to follower: %v", err)
  140. }
  141. }
  142. }
  143. }()
  144. return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
  145. if !isConnected {
  146. return false
  147. }
  148. sleepIntervalCount++
  149. if sleepIntervalCount > 32 {
  150. sleepIntervalCount = 32
  151. }
  152. time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
  153. // Check if the client has disconnected by monitoring the context
  154. select {
  155. case <-ctx.Done():
  156. err := ctx.Err()
  157. if errors.Is(err, context.Canceled) {
  158. // Client disconnected
  159. return false
  160. }
  161. glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
  162. return false
  163. default:
  164. // Continue processing the request
  165. }
  166. return true
  167. }, func(logEntry *filer_pb.LogEntry) (bool, error) {
  168. // reset the sleep interval count
  169. sleepIntervalCount = 0
  170. for imt.IsInflight(logEntry.Key) {
  171. time.Sleep(137 * time.Millisecond)
  172. // Check if the client has disconnected by monitoring the context
  173. select {
  174. case <-ctx.Done():
  175. err := ctx.Err()
  176. if err == context.Canceled {
  177. // Client disconnected
  178. return false, nil
  179. }
  180. glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
  181. return false, nil
  182. default:
  183. // Continue processing the request
  184. }
  185. }
  186. if logEntry.Key != nil {
  187. imt.EnflightMessage(logEntry.Key, logEntry.TsNs)
  188. }
  189. if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
  190. Data: &mq_pb.DataMessage{
  191. Key: logEntry.Key,
  192. Value: logEntry.Data,
  193. TsNs: logEntry.TsNs,
  194. },
  195. }}); err != nil {
  196. glog.Errorf("Error sending data: %v", err)
  197. return false, err
  198. }
  199. // Update received offset and last seen time for this subscriber
  200. subscriber.UpdateReceivedOffset(logEntry.TsNs)
  201. counter++
  202. return false, nil
  203. })
  204. }
  205. func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (startPosition log_buffer.MessagePosition) {
  206. if initMessage == nil {
  207. return
  208. }
  209. offset := initMessage.GetPartitionOffset()
  210. offsetType := initMessage.OffsetType
  211. // reset to earliest or latest
  212. if offsetType == schema_pb.OffsetType_RESET_TO_EARLIEST {
  213. startPosition = log_buffer.NewMessagePosition(1, -3)
  214. return
  215. }
  216. if offsetType == schema_pb.OffsetType_RESET_TO_LATEST {
  217. startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
  218. return
  219. }
  220. // use the exact timestamp
  221. if offsetType == schema_pb.OffsetType_EXACT_TS_NS {
  222. startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
  223. return
  224. }
  225. // try to resume
  226. if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
  227. glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
  228. startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
  229. return
  230. }
  231. if offsetType == schema_pb.OffsetType_RESUME_OR_EARLIEST {
  232. startPosition = log_buffer.NewMessagePosition(1, -5)
  233. } else if offsetType == schema_pb.OffsetType_RESUME_OR_LATEST {
  234. startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -6)
  235. }
  236. return
  237. }