broker_grpc_pub.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math/rand/v2"
  7. "net"
  8. "sync/atomic"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  14. "google.golang.org/grpc/peer"
  15. "google.golang.org/protobuf/proto"
  16. )
  17. // PUB
  18. // 1. gRPC API to configure a topic
  19. // 1.1 create a topic with existing partition count
  20. // 1.2 assign partitions to brokers
  21. // 2. gRPC API to lookup topic partitions
  22. // 3. gRPC API to publish by topic partitions
  23. // SUB
  24. // 1. gRPC API to lookup a topic partitions
  25. // Re-balance topic partitions for publishing
  26. // 1. collect stats from all the brokers
  27. // 2. Rebalance and configure new generation of partitions on brokers
  28. // 3. Tell brokers to close current gneration of publishing.
  29. // Publishers needs to lookup again and publish to the new generation of partitions.
  30. // Re-balance topic partitions for subscribing
  31. // 1. collect stats from all the brokers
  32. // Subscribers needs to listen for new partitions and connect to the brokers.
  33. // Each subscription may not get data. It can act as a backup.
  34. func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error {
  35. req, err := stream.Recv()
  36. if err != nil {
  37. return err
  38. }
  39. response := &mq_pb.PublishMessageResponse{}
  40. // TODO check whether current broker should be the leader for the topic partition
  41. initMessage := req.GetInit()
  42. if initMessage == nil {
  43. response.Error = fmt.Sprintf("missing init message")
  44. glog.Errorf("missing init message")
  45. return stream.Send(response)
  46. }
  47. // get or generate a local partition
  48. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
  49. localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p)
  50. if getOrGenErr != nil {
  51. response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr)
  52. glog.Errorf("topic %v not found: %v", t, getOrGenErr)
  53. return stream.Send(response)
  54. }
  55. // connect to follower brokers
  56. if followerErr := localTopicPartition.MaybeConnectToFollowers(initMessage, b.grpcDialOption); followerErr != nil {
  57. response.Error = followerErr.Error()
  58. glog.Errorf("MaybeConnectToFollowers: %v", followerErr)
  59. return stream.Send(response)
  60. }
  61. var receivedSequence, acknowledgedSequence int64
  62. var isClosed bool
  63. // process each published messages
  64. clientName := fmt.Sprintf("%v-%4d", findClientAddress(stream.Context()), rand.IntN(10000))
  65. publisher := topic.NewLocalPublisher()
  66. localTopicPartition.Publishers.AddPublisher(clientName, publisher)
  67. // start sending ack to publisher
  68. ackInterval := int64(1)
  69. if initMessage.AckInterval > 0 {
  70. ackInterval = int64(initMessage.AckInterval)
  71. }
  72. go func() {
  73. defer func() {
  74. // println("stop sending ack to publisher", initMessage.PublisherName)
  75. }()
  76. lastAckTime := time.Now()
  77. for !isClosed {
  78. receivedSequence = atomic.LoadInt64(&localTopicPartition.AckTsNs)
  79. if acknowledgedSequence < receivedSequence && (receivedSequence-acknowledgedSequence >= ackInterval || time.Since(lastAckTime) > 1*time.Second) {
  80. acknowledgedSequence = receivedSequence
  81. response := &mq_pb.PublishMessageResponse{
  82. AckSequence: acknowledgedSequence,
  83. }
  84. if err := stream.Send(response); err != nil {
  85. glog.Errorf("Error sending response %v: %v", response, err)
  86. }
  87. // Update acknowledged offset for this publisher
  88. publisher.UpdateAckedOffset(acknowledgedSequence)
  89. // println("sent ack", acknowledgedSequence, "=>", initMessage.PublisherName)
  90. lastAckTime = time.Now()
  91. } else {
  92. time.Sleep(1 * time.Second)
  93. }
  94. }
  95. }()
  96. defer func() {
  97. // remove the publisher
  98. localTopicPartition.Publishers.RemovePublisher(clientName)
  99. if localTopicPartition.MaybeShutdownLocalPartition() {
  100. b.localTopicManager.RemoveLocalPartition(t, p)
  101. glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
  102. }
  103. }()
  104. // send a hello message
  105. stream.Send(&mq_pb.PublishMessageResponse{})
  106. defer func() {
  107. isClosed = true
  108. }()
  109. // process each published messages
  110. for {
  111. // receive a message
  112. req, err := stream.Recv()
  113. if err != nil {
  114. if err == io.EOF {
  115. break
  116. }
  117. glog.V(0).Infof("topic %v partition %v publish stream from %s error: %v", initMessage.Topic, initMessage.Partition, initMessage.PublisherName, err)
  118. break
  119. }
  120. // Process the received message
  121. dataMessage := req.GetData()
  122. if dataMessage == nil {
  123. continue
  124. }
  125. // Basic validation: ensure message can be unmarshaled as RecordValue
  126. if dataMessage.Value != nil {
  127. record := &schema_pb.RecordValue{}
  128. if err := proto.Unmarshal(dataMessage.Value, record); err == nil {
  129. } else {
  130. // If unmarshaling fails, we skip validation but log a warning
  131. glog.V(1).Infof("Could not unmarshal RecordValue for validation on topic %v partition %v: %v", initMessage.Topic, initMessage.Partition, err)
  132. }
  133. }
  134. // The control message should still be sent to the follower
  135. // to avoid timing issue when ack messages.
  136. // send to the local partition
  137. if err = localTopicPartition.Publish(dataMessage); err != nil {
  138. return fmt.Errorf("topic %v partition %v publish error: %w", initMessage.Topic, initMessage.Partition, err)
  139. }
  140. // Update published offset and last seen time for this publisher
  141. publisher.UpdatePublishedOffset(dataMessage.TsNs)
  142. }
  143. glog.V(0).Infof("topic %v partition %v publish stream from %s closed.", initMessage.Topic, initMessage.Partition, initMessage.PublisherName)
  144. return nil
  145. }
  146. // duplicated from master_grpc_server.go
  147. func findClientAddress(ctx context.Context) string {
  148. // fmt.Printf("FromContext %+v\n", ctx)
  149. pr, ok := peer.FromContext(ctx)
  150. if !ok {
  151. glog.Error("failed to get peer from ctx")
  152. return ""
  153. }
  154. if pr.Addr == net.Addr(nil) {
  155. glog.Error("failed to get peer address")
  156. return ""
  157. }
  158. return pr.Addr.String()
  159. }