broker_grpc_query.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package broker
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "strings"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  11. "github.com/seaweedfs/seaweedfs/weed/pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  16. )
  17. // BufferRange represents a range of buffer indexes that have been flushed to disk
  18. type BufferRange struct {
  19. start int64
  20. end int64
  21. }
  22. // ErrNoPartitionAssignment indicates no broker assignment found for the partition.
  23. // This is a normal case that means there are no unflushed messages for this partition.
  24. var ErrNoPartitionAssignment = errors.New("no broker assignment found for partition")
  25. // GetUnflushedMessages returns messages from the broker's in-memory LogBuffer
  26. // that haven't been flushed to disk yet, using buffer_start metadata for deduplication
  27. // Now supports streaming responses and buffer index filtering for better performance
  28. // Includes broker routing to redirect requests to the correct broker hosting the topic/partition
  29. func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error {
  30. // Convert protobuf types to internal types
  31. t := topic.FromPbTopic(req.Topic)
  32. partition := topic.FromPbPartition(req.Partition)
  33. glog.V(2).Infof("GetUnflushedMessages request for %v %v", t, partition)
  34. // Get the local partition for this topic/partition
  35. b.accessLock.Lock()
  36. localPartition := b.localTopicManager.GetLocalPartition(t, partition)
  37. b.accessLock.Unlock()
  38. if localPartition == nil {
  39. // Topic/partition not found locally, attempt to find the correct broker and redirect
  40. glog.V(1).Infof("Topic/partition %v %v not found locally, looking up broker", t, partition)
  41. // Look up which broker hosts this topic/partition
  42. brokerHost, err := b.findBrokerForTopicPartition(req.Topic, req.Partition)
  43. if err != nil {
  44. if errors.Is(err, ErrNoPartitionAssignment) {
  45. // Normal case: no broker assignment means no unflushed messages
  46. glog.V(2).Infof("No broker assignment for %v %v - no unflushed messages", t, partition)
  47. return stream.Send(&mq_pb.GetUnflushedMessagesResponse{
  48. EndOfStream: true,
  49. })
  50. }
  51. return stream.Send(&mq_pb.GetUnflushedMessagesResponse{
  52. Error: fmt.Sprintf("failed to find broker for %v %v: %v", t, partition, err),
  53. EndOfStream: true,
  54. })
  55. }
  56. if brokerHost == "" {
  57. // This should not happen after ErrNoPartitionAssignment check, but keep for safety
  58. glog.V(2).Infof("Empty broker host for %v %v - no unflushed messages", t, partition)
  59. return stream.Send(&mq_pb.GetUnflushedMessagesResponse{
  60. EndOfStream: true,
  61. })
  62. }
  63. // Redirect to the correct broker
  64. glog.V(1).Infof("Redirecting GetUnflushedMessages request for %v %v to broker %s", t, partition, brokerHost)
  65. return b.redirectGetUnflushedMessages(brokerHost, req, stream)
  66. }
  67. // Build deduplication map from existing log files using buffer_start metadata
  68. partitionDir := topic.PartitionDir(t, partition)
  69. flushedBufferRanges, err := b.buildBufferStartDeduplicationMap(partitionDir)
  70. if err != nil {
  71. glog.Errorf("Failed to build deduplication map for %v %v: %v", t, partition, err)
  72. // Continue with empty map - better to potentially duplicate than to miss data
  73. flushedBufferRanges = make([]BufferRange, 0)
  74. }
  75. // Use buffer_start index for precise deduplication
  76. lastFlushTsNs := localPartition.LogBuffer.LastFlushTsNs
  77. startBufferIndex := req.StartBufferIndex
  78. startTimeNs := lastFlushTsNs // Still respect last flush time for safety
  79. glog.V(2).Infof("Streaming unflushed messages for %v %v, buffer >= %d, timestamp >= %d (safety), excluding %d flushed buffer ranges",
  80. t, partition, startBufferIndex, startTimeNs, len(flushedBufferRanges))
  81. // Stream messages from LogBuffer with filtering
  82. messageCount := 0
  83. startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferIndex)
  84. // Use the new LoopProcessLogDataWithBatchIndex method to avoid code duplication
  85. _, _, err = localPartition.LogBuffer.LoopProcessLogDataWithBatchIndex(
  86. "GetUnflushedMessages",
  87. startPosition,
  88. 0, // stopTsNs = 0 means process all available data
  89. func() bool { return false }, // waitForDataFn = false means don't wait for new data
  90. func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error) {
  91. // Apply buffer index filtering if specified
  92. if startBufferIndex > 0 && batchIndex < startBufferIndex {
  93. glog.V(3).Infof("Skipping message from buffer index %d (< %d)", batchIndex, startBufferIndex)
  94. return false, nil
  95. }
  96. // Check if this message is from a buffer range that's already been flushed
  97. if b.isBufferIndexFlushed(batchIndex, flushedBufferRanges) {
  98. glog.V(3).Infof("Skipping message from flushed buffer index %d", batchIndex)
  99. return false, nil
  100. }
  101. // Stream this message
  102. err = stream.Send(&mq_pb.GetUnflushedMessagesResponse{
  103. Message: &mq_pb.LogEntry{
  104. TsNs: logEntry.TsNs,
  105. Key: logEntry.Key,
  106. Data: logEntry.Data,
  107. PartitionKeyHash: uint32(logEntry.PartitionKeyHash),
  108. },
  109. EndOfStream: false,
  110. })
  111. if err != nil {
  112. glog.Errorf("Failed to stream message: %v", err)
  113. return true, err // isDone = true to stop processing
  114. }
  115. messageCount++
  116. return false, nil // Continue processing
  117. },
  118. )
  119. // Handle collection errors
  120. if err != nil && err != log_buffer.ResumeFromDiskError {
  121. streamErr := stream.Send(&mq_pb.GetUnflushedMessagesResponse{
  122. Error: fmt.Sprintf("failed to stream unflushed messages: %v", err),
  123. EndOfStream: true,
  124. })
  125. if streamErr != nil {
  126. glog.Errorf("Failed to send error response: %v", streamErr)
  127. }
  128. return err
  129. }
  130. // Send end-of-stream marker
  131. err = stream.Send(&mq_pb.GetUnflushedMessagesResponse{
  132. EndOfStream: true,
  133. })
  134. if err != nil {
  135. glog.Errorf("Failed to send end-of-stream marker: %v", err)
  136. return err
  137. }
  138. glog.V(1).Infof("Streamed %d unflushed messages for %v %v", messageCount, t, partition)
  139. return nil
  140. }
  141. // buildBufferStartDeduplicationMap scans log files to build a map of buffer ranges
  142. // that have been flushed to disk, using the buffer_start metadata
  143. func (b *MessageQueueBroker) buildBufferStartDeduplicationMap(partitionDir string) ([]BufferRange, error) {
  144. var flushedRanges []BufferRange
  145. // List all files in the partition directory using filer client accessor
  146. // Use pagination to handle directories with more than 1000 files
  147. err := b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  148. var lastFileName string
  149. var hasMore = true
  150. for hasMore {
  151. var currentBatchProcessed int
  152. err := filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
  153. currentBatchProcessed++
  154. hasMore = !isLast // If this is the last entry of a full batch, there might be more
  155. lastFileName = entry.Name
  156. if entry.IsDirectory {
  157. return nil
  158. }
  159. // Skip Parquet files - they don't represent buffer ranges
  160. if strings.HasSuffix(entry.Name, ".parquet") {
  161. return nil
  162. }
  163. // Skip offset files
  164. if strings.HasSuffix(entry.Name, ".offset") {
  165. return nil
  166. }
  167. // Get buffer start for this file
  168. bufferStart, err := b.getLogBufferStartFromFile(entry)
  169. if err != nil {
  170. glog.V(2).Infof("Failed to get buffer start from file %s: %v", entry.Name, err)
  171. return nil // Continue with other files
  172. }
  173. if bufferStart == nil {
  174. // File has no buffer metadata - skip deduplication for this file
  175. glog.V(2).Infof("File %s has no buffer_start metadata", entry.Name)
  176. return nil
  177. }
  178. // Calculate the buffer range covered by this file
  179. chunkCount := int64(len(entry.GetChunks()))
  180. if chunkCount > 0 {
  181. fileRange := BufferRange{
  182. start: bufferStart.StartIndex,
  183. end: bufferStart.StartIndex + chunkCount - 1,
  184. }
  185. flushedRanges = append(flushedRanges, fileRange)
  186. glog.V(3).Infof("File %s covers buffer range [%d-%d]", entry.Name, fileRange.start, fileRange.end)
  187. }
  188. return nil
  189. }, lastFileName, false, 1000) // Start from last processed file name for next batch
  190. if err != nil {
  191. return err
  192. }
  193. // If we processed fewer than 1000 entries, we've reached the end
  194. if currentBatchProcessed < 1000 {
  195. hasMore = false
  196. }
  197. }
  198. return nil
  199. })
  200. if err != nil {
  201. return flushedRanges, fmt.Errorf("failed to list partition directory %s: %v", partitionDir, err)
  202. }
  203. return flushedRanges, nil
  204. }
  205. // getLogBufferStartFromFile extracts LogBufferStart metadata from a log file
  206. func (b *MessageQueueBroker) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBufferStart, error) {
  207. if entry.Extended == nil {
  208. return nil, nil
  209. }
  210. // Only support binary buffer_start format
  211. if startData, exists := entry.Extended["buffer_start"]; exists {
  212. if len(startData) == 8 {
  213. startIndex := int64(binary.BigEndian.Uint64(startData))
  214. if startIndex > 0 {
  215. return &LogBufferStart{StartIndex: startIndex}, nil
  216. }
  217. } else {
  218. return nil, fmt.Errorf("invalid buffer_start format: expected 8 bytes, got %d", len(startData))
  219. }
  220. }
  221. return nil, nil
  222. }
  223. // isBufferIndexFlushed checks if a buffer index is covered by any of the flushed ranges
  224. func (b *MessageQueueBroker) isBufferIndexFlushed(bufferIndex int64, flushedRanges []BufferRange) bool {
  225. for _, flushedRange := range flushedRanges {
  226. if bufferIndex >= flushedRange.start && bufferIndex <= flushedRange.end {
  227. return true
  228. }
  229. }
  230. return false
  231. }
  232. // findBrokerForTopicPartition finds which broker hosts the specified topic/partition
  233. func (b *MessageQueueBroker) findBrokerForTopicPartition(topic *schema_pb.Topic, partition *schema_pb.Partition) (string, error) {
  234. // Use LookupTopicBrokers to find which broker hosts this topic/partition
  235. ctx := context.Background()
  236. lookupReq := &mq_pb.LookupTopicBrokersRequest{
  237. Topic: topic,
  238. }
  239. // If we're not the lock owner (balancer), we need to redirect to the balancer first
  240. var lookupResp *mq_pb.LookupTopicBrokersResponse
  241. var err error
  242. if !b.isLockOwner() {
  243. // Redirect to balancer to get topic broker assignments
  244. balancerAddress := pb.ServerAddress(b.lockAsBalancer.LockOwner())
  245. err = b.withBrokerClient(false, balancerAddress, func(client mq_pb.SeaweedMessagingClient) error {
  246. lookupResp, err = client.LookupTopicBrokers(ctx, lookupReq)
  247. return err
  248. })
  249. } else {
  250. // We are the balancer, handle the lookup directly
  251. lookupResp, err = b.LookupTopicBrokers(ctx, lookupReq)
  252. }
  253. if err != nil {
  254. return "", fmt.Errorf("failed to lookup topic brokers: %v", err)
  255. }
  256. // Find the broker assignment that matches our partition
  257. for _, assignment := range lookupResp.BrokerPartitionAssignments {
  258. if b.partitionsMatch(partition, assignment.Partition) {
  259. if assignment.LeaderBroker != "" {
  260. return assignment.LeaderBroker, nil
  261. }
  262. }
  263. }
  264. return "", ErrNoPartitionAssignment
  265. }
  266. // partitionsMatch checks if two partitions represent the same partition
  267. func (b *MessageQueueBroker) partitionsMatch(p1, p2 *schema_pb.Partition) bool {
  268. return p1.RingSize == p2.RingSize &&
  269. p1.RangeStart == p2.RangeStart &&
  270. p1.RangeStop == p2.RangeStop &&
  271. p1.UnixTimeNs == p2.UnixTimeNs
  272. }
  273. // redirectGetUnflushedMessages forwards the GetUnflushedMessages request to the correct broker
  274. func (b *MessageQueueBroker) redirectGetUnflushedMessages(brokerHost string, req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error {
  275. ctx := stream.Context()
  276. // Connect to the target broker and forward the request
  277. return b.withBrokerClient(false, pb.ServerAddress(brokerHost), func(client mq_pb.SeaweedMessagingClient) error {
  278. // Create a new stream to the target broker
  279. targetStream, err := client.GetUnflushedMessages(ctx, req)
  280. if err != nil {
  281. return fmt.Errorf("failed to create stream to broker %s: %v", brokerHost, err)
  282. }
  283. // Forward all responses from the target broker to our client
  284. for {
  285. response, err := targetStream.Recv()
  286. if err != nil {
  287. if errors.Is(err, io.EOF) {
  288. // Normal end of stream
  289. return nil
  290. }
  291. return fmt.Errorf("error receiving from broker %s: %v", brokerHost, err)
  292. }
  293. // Forward the response to our client
  294. if sendErr := stream.Send(response); sendErr != nil {
  295. return fmt.Errorf("error forwarding response to client: %v", sendErr)
  296. }
  297. // Check if this is the end of stream
  298. if response.EndOfStream {
  299. return nil
  300. }
  301. }
  302. })
  303. }