broker_grpc_lookup.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  13. )
  14. // LookupTopicBrokers returns the brokers that are serving the topic
  15. func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
  16. if !b.isLockOwner() {
  17. proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
  18. resp, err = client.LookupTopicBrokers(ctx, request)
  19. return nil
  20. })
  21. if proxyErr != nil {
  22. return nil, proxyErr
  23. }
  24. return resp, err
  25. }
  26. t := topic.FromPbTopic(request.Topic)
  27. ret := &mq_pb.LookupTopicBrokersResponse{}
  28. conf := &mq_pb.ConfigureTopicResponse{}
  29. ret.Topic = request.Topic
  30. if conf, err = b.fca.ReadTopicConfFromFiler(t); err != nil {
  31. glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err)
  32. } else {
  33. err = b.ensureTopicActiveAssignments(t, conf)
  34. ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments
  35. }
  36. return ret, err
  37. }
  38. func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
  39. if !b.isLockOwner() {
  40. proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
  41. resp, err = client.ListTopics(ctx, request)
  42. return nil
  43. })
  44. if proxyErr != nil {
  45. return nil, proxyErr
  46. }
  47. return resp, err
  48. }
  49. ret := &mq_pb.ListTopicsResponse{}
  50. // Scan the filer directory structure to find all topics
  51. err = b.fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  52. // List all namespaces under /topics
  53. stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
  54. Directory: filer.TopicsDir,
  55. Limit: 1000,
  56. })
  57. if err != nil {
  58. glog.V(0).Infof("list namespaces in %s: %v", filer.TopicsDir, err)
  59. return err
  60. }
  61. // Process each namespace
  62. for {
  63. resp, err := stream.Recv()
  64. if err != nil {
  65. if err.Error() == "EOF" {
  66. break
  67. }
  68. return err
  69. }
  70. if !resp.Entry.IsDirectory {
  71. continue
  72. }
  73. namespaceName := resp.Entry.Name
  74. namespacePath := fmt.Sprintf("%s/%s", filer.TopicsDir, namespaceName)
  75. // List all topics in this namespace
  76. topicStream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
  77. Directory: namespacePath,
  78. Limit: 1000,
  79. })
  80. if err != nil {
  81. glog.V(0).Infof("list topics in namespace %s: %v", namespacePath, err)
  82. continue
  83. }
  84. // Process each topic in the namespace
  85. for {
  86. topicResp, err := topicStream.Recv()
  87. if err != nil {
  88. if err.Error() == "EOF" {
  89. break
  90. }
  91. glog.V(0).Infof("error reading topic stream in namespace %s: %v", namespaceName, err)
  92. break
  93. }
  94. if !topicResp.Entry.IsDirectory {
  95. continue
  96. }
  97. topicName := topicResp.Entry.Name
  98. // Check if topic.conf exists
  99. topicPath := fmt.Sprintf("%s/%s", namespacePath, topicName)
  100. confResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
  101. Directory: topicPath,
  102. Name: filer.TopicConfFile,
  103. })
  104. if err != nil {
  105. glog.V(0).Infof("lookup topic.conf in %s: %v", topicPath, err)
  106. continue
  107. }
  108. if confResp.Entry != nil {
  109. // This is a valid topic
  110. topic := &schema_pb.Topic{
  111. Namespace: namespaceName,
  112. Name: topicName,
  113. }
  114. ret.Topics = append(ret.Topics, topic)
  115. }
  116. }
  117. }
  118. return nil
  119. })
  120. if err != nil {
  121. glog.V(0).Infof("list topics from filer: %v", err)
  122. // Return empty response on error
  123. return &mq_pb.ListTopicsResponse{}, nil
  124. }
  125. return ret, nil
  126. }
  127. // GetTopicConfiguration returns the complete configuration of a topic including schema and partition assignments
  128. func (b *MessageQueueBroker) GetTopicConfiguration(ctx context.Context, request *mq_pb.GetTopicConfigurationRequest) (resp *mq_pb.GetTopicConfigurationResponse, err error) {
  129. if !b.isLockOwner() {
  130. proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
  131. resp, err = client.GetTopicConfiguration(ctx, request)
  132. return nil
  133. })
  134. if proxyErr != nil {
  135. return nil, proxyErr
  136. }
  137. return resp, err
  138. }
  139. t := topic.FromPbTopic(request.Topic)
  140. var conf *mq_pb.ConfigureTopicResponse
  141. var createdAtNs, modifiedAtNs int64
  142. if conf, createdAtNs, modifiedAtNs, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
  143. glog.V(0).Infof("get topic configuration %s: %v", request.Topic, err)
  144. return nil, fmt.Errorf("failed to read topic configuration: %w", err)
  145. }
  146. // Ensure topic assignments are active
  147. err = b.ensureTopicActiveAssignments(t, conf)
  148. if err != nil {
  149. glog.V(0).Infof("ensure topic active assignments %s: %v", request.Topic, err)
  150. return nil, fmt.Errorf("failed to ensure topic assignments: %w", err)
  151. }
  152. // Build the response with complete configuration including metadata
  153. ret := &mq_pb.GetTopicConfigurationResponse{
  154. Topic: request.Topic,
  155. PartitionCount: int32(len(conf.BrokerPartitionAssignments)),
  156. RecordType: conf.RecordType,
  157. BrokerPartitionAssignments: conf.BrokerPartitionAssignments,
  158. CreatedAtNs: createdAtNs,
  159. LastUpdatedNs: modifiedAtNs,
  160. Retention: conf.Retention,
  161. }
  162. return ret, nil
  163. }
  164. // GetTopicPublishers returns the active publishers for a topic
  165. func (b *MessageQueueBroker) GetTopicPublishers(ctx context.Context, request *mq_pb.GetTopicPublishersRequest) (resp *mq_pb.GetTopicPublishersResponse, err error) {
  166. if !b.isLockOwner() {
  167. proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
  168. resp, err = client.GetTopicPublishers(ctx, request)
  169. return nil
  170. })
  171. if proxyErr != nil {
  172. return nil, proxyErr
  173. }
  174. return resp, err
  175. }
  176. t := topic.FromPbTopic(request.Topic)
  177. var publishers []*mq_pb.TopicPublisher
  178. // Get topic configuration to find partition assignments
  179. var conf *mq_pb.ConfigureTopicResponse
  180. if conf, _, _, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
  181. glog.V(0).Infof("get topic configuration for publishers %s: %v", request.Topic, err)
  182. return nil, fmt.Errorf("failed to read topic configuration: %w", err)
  183. }
  184. // Collect publishers from each partition that is hosted on this broker
  185. for _, assignment := range conf.BrokerPartitionAssignments {
  186. // Only collect from partitions where this broker is the leader
  187. if assignment.LeaderBroker == b.option.BrokerAddress().String() {
  188. partition := topic.FromPbPartition(assignment.Partition)
  189. if localPartition := b.localTopicManager.GetLocalPartition(t, partition); localPartition != nil {
  190. // Get publisher information from local partition
  191. localPartition.Publishers.ForEachPublisher(func(clientName string, publisher *topic.LocalPublisher) {
  192. connectTimeNs, lastSeenTimeNs := publisher.GetTimestamps()
  193. lastPublishedOffset, lastAckedOffset := publisher.GetOffsets()
  194. publishers = append(publishers, &mq_pb.TopicPublisher{
  195. PublisherName: clientName,
  196. ClientId: clientName, // For now, client name is used as client ID
  197. Partition: assignment.Partition,
  198. ConnectTimeNs: connectTimeNs,
  199. LastSeenTimeNs: lastSeenTimeNs,
  200. Broker: assignment.LeaderBroker,
  201. IsActive: true,
  202. LastPublishedOffset: lastPublishedOffset,
  203. LastAckedOffset: lastAckedOffset,
  204. })
  205. })
  206. }
  207. }
  208. }
  209. return &mq_pb.GetTopicPublishersResponse{
  210. Publishers: publishers,
  211. }, nil
  212. }
  213. // GetTopicSubscribers returns the active subscribers for a topic
  214. func (b *MessageQueueBroker) GetTopicSubscribers(ctx context.Context, request *mq_pb.GetTopicSubscribersRequest) (resp *mq_pb.GetTopicSubscribersResponse, err error) {
  215. if !b.isLockOwner() {
  216. proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
  217. resp, err = client.GetTopicSubscribers(ctx, request)
  218. return nil
  219. })
  220. if proxyErr != nil {
  221. return nil, proxyErr
  222. }
  223. return resp, err
  224. }
  225. t := topic.FromPbTopic(request.Topic)
  226. var subscribers []*mq_pb.TopicSubscriber
  227. // Get topic configuration to find partition assignments
  228. var conf *mq_pb.ConfigureTopicResponse
  229. if conf, _, _, err = b.fca.ReadTopicConfFromFilerWithMetadata(t); err != nil {
  230. glog.V(0).Infof("get topic configuration for subscribers %s: %v", request.Topic, err)
  231. return nil, fmt.Errorf("failed to read topic configuration: %w", err)
  232. }
  233. // Collect subscribers from each partition that is hosted on this broker
  234. for _, assignment := range conf.BrokerPartitionAssignments {
  235. // Only collect from partitions where this broker is the leader
  236. if assignment.LeaderBroker == b.option.BrokerAddress().String() {
  237. partition := topic.FromPbPartition(assignment.Partition)
  238. if localPartition := b.localTopicManager.GetLocalPartition(t, partition); localPartition != nil {
  239. // Get subscriber information from local partition
  240. localPartition.Subscribers.ForEachSubscriber(func(clientName string, subscriber *topic.LocalSubscriber) {
  241. // Parse client name to extract consumer group and consumer ID
  242. // Format is typically: "consumerGroup/consumerID"
  243. consumerGroup := "default"
  244. consumerID := clientName
  245. if idx := strings.Index(clientName, "/"); idx != -1 {
  246. consumerGroup = clientName[:idx]
  247. consumerID = clientName[idx+1:]
  248. }
  249. connectTimeNs, lastSeenTimeNs := subscriber.GetTimestamps()
  250. lastReceivedOffset, lastAckedOffset := subscriber.GetOffsets()
  251. subscribers = append(subscribers, &mq_pb.TopicSubscriber{
  252. ConsumerGroup: consumerGroup,
  253. ConsumerId: consumerID,
  254. ClientId: clientName, // Full client name as client ID
  255. Partition: assignment.Partition,
  256. ConnectTimeNs: connectTimeNs,
  257. LastSeenTimeNs: lastSeenTimeNs,
  258. Broker: assignment.LeaderBroker,
  259. IsActive: true,
  260. CurrentOffset: lastAckedOffset, // for compatibility
  261. LastReceivedOffset: lastReceivedOffset,
  262. })
  263. })
  264. }
  265. }
  266. }
  267. return &mq_pb.GetTopicSubscribersResponse{
  268. Subscribers: subscribers,
  269. }, nil
  270. }
  271. func (b *MessageQueueBroker) isLockOwner() bool {
  272. return b.lockAsBalancer.LockOwner() == b.option.BrokerAddress().String()
  273. }