on_each_partition.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package sub_client
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  10. "io"
  11. )
  12. type KeyedOffset struct {
  13. Key []byte
  14. Offset int64
  15. }
  16. func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}, onDataMessageFn OnDataMessageFn) error {
  17. // connect to the partition broker
  18. return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  19. subscribeClient, err := client.SubscribeMessage(context.Background())
  20. if err != nil {
  21. return fmt.Errorf("create subscribe client: %w", err)
  22. }
  23. slidingWindowSize := sub.SubscriberConfig.SlidingWindowSize
  24. if slidingWindowSize <= 0 {
  25. slidingWindowSize = 1
  26. }
  27. po := findPartitionOffset(sub.ContentConfig.PartitionOffsets, assigned.Partition)
  28. if po == nil {
  29. po = &schema_pb.PartitionOffset{
  30. Partition: assigned.Partition,
  31. StartTsNs: sub.ContentConfig.OffsetTsNs,
  32. }
  33. }
  34. if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
  35. Message: &mq_pb.SubscribeMessageRequest_Init{
  36. Init: &mq_pb.SubscribeMessageRequest_InitMessage{
  37. ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
  38. ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
  39. Topic: sub.ContentConfig.Topic.ToPbTopic(),
  40. PartitionOffset: po,
  41. OffsetType: sub.ContentConfig.OffsetType,
  42. Filter: sub.ContentConfig.Filter,
  43. FollowerBroker: assigned.FollowerBroker,
  44. SlidingWindowSize: slidingWindowSize,
  45. },
  46. },
  47. }); err != nil {
  48. glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
  49. }
  50. glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
  51. if sub.OnCompletionFunc != nil {
  52. defer sub.OnCompletionFunc()
  53. }
  54. go func() {
  55. for {
  56. select {
  57. case <-sub.ctx.Done():
  58. subscribeClient.CloseSend()
  59. return
  60. case <-stopCh:
  61. subscribeClient.CloseSend()
  62. return
  63. case ack, ok := <-sub.PartitionOffsetChan:
  64. if !ok {
  65. subscribeClient.CloseSend()
  66. return
  67. }
  68. subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
  69. Message: &mq_pb.SubscribeMessageRequest_Ack{
  70. Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
  71. Key: ack.Key,
  72. Sequence: ack.Offset,
  73. },
  74. },
  75. })
  76. }
  77. }
  78. }()
  79. for {
  80. // glog.V(0).Infof("subscriber %s/%s waiting for message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
  81. resp, err := subscribeClient.Recv()
  82. if err != nil {
  83. if errors.Is(err, io.EOF) {
  84. return nil
  85. }
  86. return fmt.Errorf("subscribe recv: %w", err)
  87. }
  88. if resp.Message == nil {
  89. glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
  90. continue
  91. }
  92. select {
  93. case <-sub.ctx.Done():
  94. return nil
  95. case <-stopCh:
  96. return nil
  97. default:
  98. }
  99. switch m := resp.Message.(type) {
  100. case *mq_pb.SubscribeMessageResponse_Data:
  101. if m.Data.Ctrl != nil {
  102. glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
  103. continue
  104. }
  105. if len(m.Data.Key) == 0 {
  106. // fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m))
  107. continue
  108. }
  109. onDataMessageFn(m)
  110. case *mq_pb.SubscribeMessageResponse_Ctrl:
  111. // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
  112. if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
  113. return io.EOF
  114. }
  115. }
  116. }
  117. })
  118. }
  119. func findPartitionOffset(partitionOffsets []*schema_pb.PartitionOffset, partition *schema_pb.Partition) *schema_pb.PartitionOffset {
  120. for _, po := range partitionOffsets {
  121. if po.Partition == partition {
  122. return po
  123. }
  124. }
  125. return nil
  126. }