| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- package broker
- import (
- "context"
- "fmt"
- "strings"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- )
- // LookupTopicBrokers returns the brokers that are serving the topic
- func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
- if !b.isLockOwner() {
- proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
- resp, err = client.LookupTopicBrokers(ctx, request)
- return nil
- })
- if proxyErr != nil {
- return nil, proxyErr
- }
- return resp, err
- }
- t := topic.FromPbTopic(request.Topic)
- ret := &mq_pb.LookupTopicBrokersResponse{}
- conf := &mq_pb.ConfigureTopicResponse{}
- ret.Topic = request.Topic
- if conf, err = b.fca.ReadTopicConfFromFiler(t); err != nil {
- glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err)
- } else {
- err = b.ensureTopicActiveAssignments(t, conf)
- ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments
- }
- return ret, err
- }
- func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
- if !b.isLockOwner() {
- proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
- resp, err = client.ListTopics(ctx, request)
- return nil
- })
- if proxyErr != nil {
- return nil, proxyErr
- }
- return resp, err
- }
- ret := &mq_pb.ListTopicsResponse{}
- // Scan the filer directory structure to find all topics
- err = b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- // List all namespaces under /topics
- stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
- Directory: filer.TopicsDir,
- Limit: 1000,
- })
- if err != nil {
- glog.V(0).Infof("list namespaces in %s: %v", filer.TopicsDir, err)
- return err
- }
- // Process each namespace
- for {
- resp, err := stream.Recv()
- if err != nil {
- if err.Error() == "EOF" {
- break
- }
- return err
- }
- if !resp.Entry.IsDirectory {
- continue
- }
- namespaceName := resp.Entry.Name
- namespacePath := fmt.Sprintf("%s/%s", filer.TopicsDir, namespaceName)
- // List all topics in this namespace
- topicStream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
- Directory: namespacePath,
- Limit: 1000,
- })
- if err != nil {
- glog.V(0).Infof("list topics in namespace %s: %v", namespacePath, err)
- continue
- }
- // Process each topic in the namespace
- for {
- topicResp, err := topicStream.Recv()
- if err != nil {
- if err.Error() == "EOF" {
- break
- }
- glog.V(0).Infof("error reading topic stream in namespace %s: %v", namespaceName, err)
- break
- }
- if !topicResp.Entry.IsDirectory {
- continue
- }
- topicName := topicResp.Entry.Name
- // Check if topic.conf exists
- topicPath := fmt.Sprintf("%s/%s", namespacePath, topicName)
- confResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
- Directory: topicPath,
- Name: filer.TopicConfFile,
- })
- if err != nil {
- glog.V(0).Infof("lookup topic.conf in %s: %v", topicPath, err)
- continue
- }
- if confResp.Entry != nil {
- // This is a valid topic
- topic := &schema_pb.Topic{
- Namespace: namespaceName,
- Name: topicName,
- }
- ret.Topics = append(ret.Topics, topic)
- }
- }
- }
- return nil
- })
- if err != nil {
- glog.V(0).Infof("list topics from filer: %v", err)
- // Return empty response on error
- return &mq_pb.ListTopicsResponse{}, nil
- }
- return ret, nil
- }
- // GetTopicConfiguration returns the complete configuration of a topic including schema and partition assignments
- func (b *MessageQueueBroker) GetTopicConfiguration(ctx context.Context, request *mq_pb.GetTopicConfigurationRequest) (resp *mq_pb.GetTopicConfigurationResponse, err error) {
- if !b.isLockOwner() {
- proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
- resp, err = client.GetTopicConfiguration(ctx, request)
- return nil
- })
- if proxyErr != nil {
- return nil, proxyErr
- }
- return resp, err
- }
- t := topic.FromPbTopic(request.Topic)
- var conf *mq_pb.ConfigureTopicResponse
- var createdAtNs, modifiedAtNs int64
- if conf, createdAtNs, modifiedAtNs, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
- glog.V(0).Infof("get topic configuration %s: %v", request.Topic, err)
- return nil, fmt.Errorf("failed to read topic configuration: %w", err)
- }
- // Ensure topic assignments are active
- err = b.ensureTopicActiveAssignments(t, conf)
- if err != nil {
- glog.V(0).Infof("ensure topic active assignments %s: %v", request.Topic, err)
- return nil, fmt.Errorf("failed to ensure topic assignments: %w", err)
- }
- // Build the response with complete configuration including metadata
- ret := &mq_pb.GetTopicConfigurationResponse{
- Topic: request.Topic,
- PartitionCount: int32(len(conf.BrokerPartitionAssignments)),
- RecordType: conf.RecordType,
- BrokerPartitionAssignments: conf.BrokerPartitionAssignments,
- CreatedAtNs: createdAtNs,
- LastUpdatedNs: modifiedAtNs,
- Retention: conf.Retention,
- }
- return ret, nil
- }
- // GetTopicPublishers returns the active publishers for a topic
- func (b *MessageQueueBroker) GetTopicPublishers(ctx context.Context, request *mq_pb.GetTopicPublishersRequest) (resp *mq_pb.GetTopicPublishersResponse, err error) {
- if !b.isLockOwner() {
- proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
- resp, err = client.GetTopicPublishers(ctx, request)
- return nil
- })
- if proxyErr != nil {
- return nil, proxyErr
- }
- return resp, err
- }
- t := topic.FromPbTopic(request.Topic)
- var publishers []*mq_pb.TopicPublisher
- // Get topic configuration to find partition assignments
- var conf *mq_pb.ConfigureTopicResponse
- if conf, _, _, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
- glog.V(0).Infof("get topic configuration for publishers %s: %v", request.Topic, err)
- return nil, fmt.Errorf("failed to read topic configuration: %w", err)
- }
- // Collect publishers from each partition that is hosted on this broker
- for _, assignment := range conf.BrokerPartitionAssignments {
- // Only collect from partitions where this broker is the leader
- if assignment.LeaderBroker == b.option.BrokerAddress().String() {
- partition := topic.FromPbPartition(assignment.Partition)
- if localPartition := b.localTopicManager.GetLocalPartition(t, partition); localPartition != nil {
- // Get publisher information from local partition
- localPartition.Publishers.ForEachPublisher(func(clientName string, publisher *topic.LocalPublisher) {
- connectTimeNs, lastSeenTimeNs := publisher.GetTimestamps()
- lastPublishedOffset, lastAckedOffset := publisher.GetOffsets()
- publishers = append(publishers, &mq_pb.TopicPublisher{
- PublisherName: clientName,
- ClientId: clientName, // For now, client name is used as client ID
- Partition: assignment.Partition,
- ConnectTimeNs: connectTimeNs,
- LastSeenTimeNs: lastSeenTimeNs,
- Broker: assignment.LeaderBroker,
- IsActive: true,
- LastPublishedOffset: lastPublishedOffset,
- LastAckedOffset: lastAckedOffset,
- })
- })
- }
- }
- }
- return &mq_pb.GetTopicPublishersResponse{
- Publishers: publishers,
- }, nil
- }
- // GetTopicSubscribers returns the active subscribers for a topic
- func (b *MessageQueueBroker) GetTopicSubscribers(ctx context.Context, request *mq_pb.GetTopicSubscribersRequest) (resp *mq_pb.GetTopicSubscribersResponse, err error) {
- if !b.isLockOwner() {
- proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
- resp, err = client.GetTopicSubscribers(ctx, request)
- return nil
- })
- if proxyErr != nil {
- return nil, proxyErr
- }
- return resp, err
- }
- t := topic.FromPbTopic(request.Topic)
- var subscribers []*mq_pb.TopicSubscriber
- // Get topic configuration to find partition assignments
- var conf *mq_pb.ConfigureTopicResponse
- if conf, _, _, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
- glog.V(0).Infof("get topic configuration for subscribers %s: %v", request.Topic, err)
- return nil, fmt.Errorf("failed to read topic configuration: %w", err)
- }
- // Collect subscribers from each partition that is hosted on this broker
- for _, assignment := range conf.BrokerPartitionAssignments {
- // Only collect from partitions where this broker is the leader
- if assignment.LeaderBroker == b.option.BrokerAddress().String() {
- partition := topic.FromPbPartition(assignment.Partition)
- if localPartition := b.localTopicManager.GetLocalPartition(t, partition); localPartition != nil {
- // Get subscriber information from local partition
- localPartition.Subscribers.ForEachSubscriber(func(clientName string, subscriber *topic.LocalSubscriber) {
- // Parse client name to extract consumer group and consumer ID
- // Format is typically: "consumerGroup/consumerID"
- consumerGroup := "default"
- consumerID := clientName
- if idx := strings.Index(clientName, "/"); idx != -1 {
- consumerGroup = clientName[:idx]
- consumerID = clientName[idx+1:]
- }
- connectTimeNs, lastSeenTimeNs := subscriber.GetTimestamps()
- lastReceivedOffset, lastAckedOffset := subscriber.GetOffsets()
- subscribers = append(subscribers, &mq_pb.TopicSubscriber{
- ConsumerGroup: consumerGroup,
- ConsumerId: consumerID,
- ClientId: clientName, // Full client name as client ID
- Partition: assignment.Partition,
- ConnectTimeNs: connectTimeNs,
- LastSeenTimeNs: lastSeenTimeNs,
- Broker: assignment.LeaderBroker,
- IsActive: true,
- CurrentOffset: lastAckedOffset, // for compatibility
- LastReceivedOffset: lastReceivedOffset,
- })
- })
- }
- }
- }
- return &mq_pb.GetTopicSubscribersResponse{
- Subscribers: subscribers,
- }, nil
- }
- func (b *MessageQueueBroker) isLockOwner() bool {
- return b.lockAsBalancer.LockOwner() == b.option.BrokerAddress().String()
- }
|