| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- package broker
- import (
- "context"
- "encoding/binary"
- "errors"
- "fmt"
- "io"
- "strings"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
- )
- // BufferRange represents a range of buffer indexes that have been flushed to disk
- type BufferRange struct {
- start int64
- end int64
- }
- // ErrNoPartitionAssignment indicates no broker assignment found for the partition.
- // This is a normal case that means there are no unflushed messages for this partition.
- var ErrNoPartitionAssignment = errors.New("no broker assignment found for partition")
- // GetUnflushedMessages returns messages from the broker's in-memory LogBuffer
- // that haven't been flushed to disk yet, using buffer_start metadata for deduplication
- // Now supports streaming responses and buffer index filtering for better performance
- // Includes broker routing to redirect requests to the correct broker hosting the topic/partition
- func (b *MessageQueueBroker) GetUnflushedMessages(req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error {
- // Convert protobuf types to internal types
- t := topic.FromPbTopic(req.Topic)
- partition := topic.FromPbPartition(req.Partition)
- glog.V(2).Infof("GetUnflushedMessages request for %v %v", t, partition)
- // Get the local partition for this topic/partition
- b.accessLock.Lock()
- localPartition := b.localTopicManager.GetLocalPartition(t, partition)
- b.accessLock.Unlock()
- if localPartition == nil {
- // Topic/partition not found locally, attempt to find the correct broker and redirect
- glog.V(1).Infof("Topic/partition %v %v not found locally, looking up broker", t, partition)
- // Look up which broker hosts this topic/partition
- brokerHost, err := b.findBrokerForTopicPartition(req.Topic, req.Partition)
- if err != nil {
- if errors.Is(err, ErrNoPartitionAssignment) {
- // Normal case: no broker assignment means no unflushed messages
- glog.V(2).Infof("No broker assignment for %v %v - no unflushed messages", t, partition)
- return stream.Send(&mq_pb.GetUnflushedMessagesResponse{
- EndOfStream: true,
- })
- }
- return stream.Send(&mq_pb.GetUnflushedMessagesResponse{
- Error: fmt.Sprintf("failed to find broker for %v %v: %v", t, partition, err),
- EndOfStream: true,
- })
- }
- if brokerHost == "" {
- // This should not happen after ErrNoPartitionAssignment check, but keep for safety
- glog.V(2).Infof("Empty broker host for %v %v - no unflushed messages", t, partition)
- return stream.Send(&mq_pb.GetUnflushedMessagesResponse{
- EndOfStream: true,
- })
- }
- // Redirect to the correct broker
- glog.V(1).Infof("Redirecting GetUnflushedMessages request for %v %v to broker %s", t, partition, brokerHost)
- return b.redirectGetUnflushedMessages(brokerHost, req, stream)
- }
- // Build deduplication map from existing log files using buffer_start metadata
- partitionDir := topic.PartitionDir(t, partition)
- flushedBufferRanges, err := b.buildBufferStartDeduplicationMap(partitionDir)
- if err != nil {
- glog.Errorf("Failed to build deduplication map for %v %v: %v", t, partition, err)
- // Continue with empty map - better to potentially duplicate than to miss data
- flushedBufferRanges = make([]BufferRange, 0)
- }
- // Use buffer_start index for precise deduplication
- lastFlushTsNs := localPartition.LogBuffer.LastFlushTsNs
- startBufferIndex := req.StartBufferIndex
- startTimeNs := lastFlushTsNs // Still respect last flush time for safety
- glog.V(2).Infof("Streaming unflushed messages for %v %v, buffer >= %d, timestamp >= %d (safety), excluding %d flushed buffer ranges",
- t, partition, startBufferIndex, startTimeNs, len(flushedBufferRanges))
- // Stream messages from LogBuffer with filtering
- messageCount := 0
- startPosition := log_buffer.NewMessagePosition(startTimeNs, startBufferIndex)
- // Use the new LoopProcessLogDataWithBatchIndex method to avoid code duplication
- _, _, err = localPartition.LogBuffer.LoopProcessLogDataWithBatchIndex(
- "GetUnflushedMessages",
- startPosition,
- 0, // stopTsNs = 0 means process all available data
- func() bool { return false }, // waitForDataFn = false means don't wait for new data
- func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error) {
- // Apply buffer index filtering if specified
- if startBufferIndex > 0 && batchIndex < startBufferIndex {
- glog.V(3).Infof("Skipping message from buffer index %d (< %d)", batchIndex, startBufferIndex)
- return false, nil
- }
- // Check if this message is from a buffer range that's already been flushed
- if b.isBufferIndexFlushed(batchIndex, flushedBufferRanges) {
- glog.V(3).Infof("Skipping message from flushed buffer index %d", batchIndex)
- return false, nil
- }
- // Stream this message
- err = stream.Send(&mq_pb.GetUnflushedMessagesResponse{
- Message: &mq_pb.LogEntry{
- TsNs: logEntry.TsNs,
- Key: logEntry.Key,
- Data: logEntry.Data,
- PartitionKeyHash: uint32(logEntry.PartitionKeyHash),
- },
- EndOfStream: false,
- })
- if err != nil {
- glog.Errorf("Failed to stream message: %v", err)
- return true, err // isDone = true to stop processing
- }
- messageCount++
- return false, nil // Continue processing
- },
- )
- // Handle collection errors
- if err != nil && err != log_buffer.ResumeFromDiskError {
- streamErr := stream.Send(&mq_pb.GetUnflushedMessagesResponse{
- Error: fmt.Sprintf("failed to stream unflushed messages: %v", err),
- EndOfStream: true,
- })
- if streamErr != nil {
- glog.Errorf("Failed to send error response: %v", streamErr)
- }
- return err
- }
- // Send end-of-stream marker
- err = stream.Send(&mq_pb.GetUnflushedMessagesResponse{
- EndOfStream: true,
- })
- if err != nil {
- glog.Errorf("Failed to send end-of-stream marker: %v", err)
- return err
- }
- glog.V(1).Infof("Streamed %d unflushed messages for %v %v", messageCount, t, partition)
- return nil
- }
- // buildBufferStartDeduplicationMap scans log files to build a map of buffer ranges
- // that have been flushed to disk, using the buffer_start metadata
- func (b *MessageQueueBroker) buildBufferStartDeduplicationMap(partitionDir string) ([]BufferRange, error) {
- var flushedRanges []BufferRange
- // List all files in the partition directory using filer client accessor
- // Use pagination to handle directories with more than 1000 files
- err := b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- var lastFileName string
- var hasMore = true
- for hasMore {
- var currentBatchProcessed int
- err := filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
- currentBatchProcessed++
- hasMore = !isLast // If this is the last entry of a full batch, there might be more
- lastFileName = entry.Name
- if entry.IsDirectory {
- return nil
- }
- // Skip Parquet files - they don't represent buffer ranges
- if strings.HasSuffix(entry.Name, ".parquet") {
- return nil
- }
- // Skip offset files
- if strings.HasSuffix(entry.Name, ".offset") {
- return nil
- }
- // Get buffer start for this file
- bufferStart, err := b.getLogBufferStartFromFile(entry)
- if err != nil {
- glog.V(2).Infof("Failed to get buffer start from file %s: %v", entry.Name, err)
- return nil // Continue with other files
- }
- if bufferStart == nil {
- // File has no buffer metadata - skip deduplication for this file
- glog.V(2).Infof("File %s has no buffer_start metadata", entry.Name)
- return nil
- }
- // Calculate the buffer range covered by this file
- chunkCount := int64(len(entry.GetChunks()))
- if chunkCount > 0 {
- fileRange := BufferRange{
- start: bufferStart.StartIndex,
- end: bufferStart.StartIndex + chunkCount - 1,
- }
- flushedRanges = append(flushedRanges, fileRange)
- glog.V(3).Infof("File %s covers buffer range [%d-%d]", entry.Name, fileRange.start, fileRange.end)
- }
- return nil
- }, lastFileName, false, 1000) // Start from last processed file name for next batch
- if err != nil {
- return err
- }
- // If we processed fewer than 1000 entries, we've reached the end
- if currentBatchProcessed < 1000 {
- hasMore = false
- }
- }
- return nil
- })
- if err != nil {
- return flushedRanges, fmt.Errorf("failed to list partition directory %s: %v", partitionDir, err)
- }
- return flushedRanges, nil
- }
- // getLogBufferStartFromFile extracts LogBufferStart metadata from a log file
- func (b *MessageQueueBroker) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBufferStart, error) {
- if entry.Extended == nil {
- return nil, nil
- }
- // Only support binary buffer_start format
- if startData, exists := entry.Extended["buffer_start"]; exists {
- if len(startData) == 8 {
- startIndex := int64(binary.BigEndian.Uint64(startData))
- if startIndex > 0 {
- return &LogBufferStart{StartIndex: startIndex}, nil
- }
- } else {
- return nil, fmt.Errorf("invalid buffer_start format: expected 8 bytes, got %d", len(startData))
- }
- }
- return nil, nil
- }
- // isBufferIndexFlushed checks if a buffer index is covered by any of the flushed ranges
- func (b *MessageQueueBroker) isBufferIndexFlushed(bufferIndex int64, flushedRanges []BufferRange) bool {
- for _, flushedRange := range flushedRanges {
- if bufferIndex >= flushedRange.start && bufferIndex <= flushedRange.end {
- return true
- }
- }
- return false
- }
- // findBrokerForTopicPartition finds which broker hosts the specified topic/partition
- func (b *MessageQueueBroker) findBrokerForTopicPartition(topic *schema_pb.Topic, partition *schema_pb.Partition) (string, error) {
- // Use LookupTopicBrokers to find which broker hosts this topic/partition
- ctx := context.Background()
- lookupReq := &mq_pb.LookupTopicBrokersRequest{
- Topic: topic,
- }
- // If we're not the lock owner (balancer), we need to redirect to the balancer first
- var lookupResp *mq_pb.LookupTopicBrokersResponse
- var err error
- if !b.isLockOwner() {
- // Redirect to balancer to get topic broker assignments
- balancerAddress := pb.ServerAddress(b.lockAsBalancer.LockOwner())
- err = b.withBrokerClient(false, balancerAddress, func(client mq_pb.SeaweedMessagingClient) error {
- lookupResp, err = client.LookupTopicBrokers(ctx, lookupReq)
- return err
- })
- } else {
- // We are the balancer, handle the lookup directly
- lookupResp, err = b.LookupTopicBrokers(ctx, lookupReq)
- }
- if err != nil {
- return "", fmt.Errorf("failed to lookup topic brokers: %v", err)
- }
- // Find the broker assignment that matches our partition
- for _, assignment := range lookupResp.BrokerPartitionAssignments {
- if b.partitionsMatch(partition, assignment.Partition) {
- if assignment.LeaderBroker != "" {
- return assignment.LeaderBroker, nil
- }
- }
- }
- return "", ErrNoPartitionAssignment
- }
- // partitionsMatch checks if two partitions represent the same partition
- func (b *MessageQueueBroker) partitionsMatch(p1, p2 *schema_pb.Partition) bool {
- return p1.RingSize == p2.RingSize &&
- p1.RangeStart == p2.RangeStart &&
- p1.RangeStop == p2.RangeStop &&
- p1.UnixTimeNs == p2.UnixTimeNs
- }
- // redirectGetUnflushedMessages forwards the GetUnflushedMessages request to the correct broker
- func (b *MessageQueueBroker) redirectGetUnflushedMessages(brokerHost string, req *mq_pb.GetUnflushedMessagesRequest, stream mq_pb.SeaweedMessaging_GetUnflushedMessagesServer) error {
- ctx := stream.Context()
- // Connect to the target broker and forward the request
- return b.withBrokerClient(false, pb.ServerAddress(brokerHost), func(client mq_pb.SeaweedMessagingClient) error {
- // Create a new stream to the target broker
- targetStream, err := client.GetUnflushedMessages(ctx, req)
- if err != nil {
- return fmt.Errorf("failed to create stream to broker %s: %v", brokerHost, err)
- }
- // Forward all responses from the target broker to our client
- for {
- response, err := targetStream.Recv()
- if err != nil {
- if errors.Is(err, io.EOF) {
- // Normal end of stream
- return nil
- }
- return fmt.Errorf("error receiving from broker %s: %v", brokerHost, err)
- }
- // Forward the response to our client
- if sendErr := stream.Send(response); sendErr != nil {
- return fmt.Errorf("error forwarding response to client: %v", sendErr)
- }
- // Check if this is the end of stream
- if response.EndOfStream {
- return nil
- }
- }
- })
- }
|