mq_management.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615
  1. package dash
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "path/filepath"
  7. "strings"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/cluster"
  10. "github.com/seaweedfs/seaweedfs/weed/filer"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. )
  20. // GetTopics retrieves message queue topics data
  21. func (s *AdminServer) GetTopics() (*TopicsData, error) {
  22. var topics []TopicInfo
  23. // Find broker leader and get topics
  24. brokerLeader, err := s.findBrokerLeader()
  25. if err != nil {
  26. // If no broker leader found, return empty data
  27. return &TopicsData{
  28. Topics: topics,
  29. TotalTopics: len(topics),
  30. LastUpdated: time.Now(),
  31. }, nil
  32. }
  33. // Connect to broker leader and list topics
  34. err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
  35. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  36. defer cancel()
  37. resp, err := client.ListTopics(ctx, &mq_pb.ListTopicsRequest{})
  38. if err != nil {
  39. return err
  40. }
  41. // Convert protobuf topics to TopicInfo - only include available data
  42. for _, pbTopic := range resp.Topics {
  43. topicInfo := TopicInfo{
  44. Name: fmt.Sprintf("%s.%s", pbTopic.Namespace, pbTopic.Name),
  45. Partitions: 0, // Will be populated by LookupTopicBrokers call
  46. Retention: TopicRetentionInfo{
  47. Enabled: false,
  48. DisplayValue: 0,
  49. DisplayUnit: "days",
  50. },
  51. }
  52. // Get topic configuration to get partition count and retention info
  53. lookupResp, err := client.LookupTopicBrokers(ctx, &mq_pb.LookupTopicBrokersRequest{
  54. Topic: pbTopic,
  55. })
  56. if err == nil {
  57. topicInfo.Partitions = len(lookupResp.BrokerPartitionAssignments)
  58. }
  59. // Get topic configuration for retention information
  60. configResp, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
  61. Topic: pbTopic,
  62. })
  63. if err == nil && configResp.Retention != nil {
  64. topicInfo.Retention = convertTopicRetention(configResp.Retention)
  65. }
  66. topics = append(topics, topicInfo)
  67. }
  68. return nil
  69. })
  70. if err != nil {
  71. // If connection fails, return empty data
  72. return &TopicsData{
  73. Topics: topics,
  74. TotalTopics: len(topics),
  75. LastUpdated: time.Now(),
  76. }, nil
  77. }
  78. return &TopicsData{
  79. Topics: topics,
  80. TotalTopics: len(topics),
  81. LastUpdated: time.Now(),
  82. // Don't include TotalMessages and TotalSize as they're not available
  83. }, nil
  84. }
  85. // GetSubscribers retrieves message queue subscribers data
  86. func (s *AdminServer) GetSubscribers() (*SubscribersData, error) {
  87. var subscribers []SubscriberInfo
  88. // Find broker leader and get subscriber info from broker stats
  89. brokerLeader, err := s.findBrokerLeader()
  90. if err != nil {
  91. // If no broker leader found, return empty data
  92. return &SubscribersData{
  93. Subscribers: subscribers,
  94. TotalSubscribers: len(subscribers),
  95. ActiveSubscribers: 0,
  96. LastUpdated: time.Now(),
  97. }, nil
  98. }
  99. // Connect to broker leader and get subscriber information
  100. // Note: SeaweedMQ doesn't have a direct API to list all subscribers
  101. // We would need to collect this information from broker statistics
  102. // For now, return empty data structure as subscriber info is not
  103. // directly available through the current MQ API
  104. err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
  105. // TODO: Implement subscriber data collection from broker statistics
  106. // This would require access to broker internal statistics about
  107. // active subscribers, consumer groups, etc.
  108. return nil
  109. })
  110. if err != nil {
  111. // If connection fails, return empty data
  112. return &SubscribersData{
  113. Subscribers: subscribers,
  114. TotalSubscribers: len(subscribers),
  115. ActiveSubscribers: 0,
  116. LastUpdated: time.Now(),
  117. }, nil
  118. }
  119. activeCount := 0
  120. for _, sub := range subscribers {
  121. if sub.Status == "active" {
  122. activeCount++
  123. }
  124. }
  125. return &SubscribersData{
  126. Subscribers: subscribers,
  127. TotalSubscribers: len(subscribers),
  128. ActiveSubscribers: activeCount,
  129. LastUpdated: time.Now(),
  130. }, nil
  131. }
  132. // GetTopicDetails retrieves detailed information about a specific topic
  133. func (s *AdminServer) GetTopicDetails(namespace, topicName string) (*TopicDetailsData, error) {
  134. // Find broker leader
  135. brokerLeader, err := s.findBrokerLeader()
  136. if err != nil {
  137. return nil, fmt.Errorf("failed to find broker leader: %w", err)
  138. }
  139. var topicDetails *TopicDetailsData
  140. // Connect to broker leader and get topic configuration
  141. err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
  142. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  143. defer cancel()
  144. // Get topic configuration using the new API
  145. configResp, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
  146. Topic: &schema_pb.Topic{
  147. Namespace: namespace,
  148. Name: topicName,
  149. },
  150. })
  151. if err != nil {
  152. return fmt.Errorf("failed to get topic configuration: %w", err)
  153. }
  154. // Initialize topic details
  155. topicDetails = &TopicDetailsData{
  156. TopicName: fmt.Sprintf("%s.%s", namespace, topicName),
  157. Namespace: namespace,
  158. Name: topicName,
  159. Partitions: []PartitionInfo{},
  160. Schema: []SchemaFieldInfo{},
  161. Publishers: []PublisherInfo{},
  162. Subscribers: []TopicSubscriberInfo{},
  163. ConsumerGroupOffsets: []ConsumerGroupOffsetInfo{},
  164. Retention: convertTopicRetention(configResp.Retention),
  165. CreatedAt: time.Unix(0, configResp.CreatedAtNs),
  166. LastUpdated: time.Unix(0, configResp.LastUpdatedNs),
  167. }
  168. // Set current time if timestamps are not available
  169. if configResp.CreatedAtNs == 0 {
  170. topicDetails.CreatedAt = time.Now()
  171. }
  172. if configResp.LastUpdatedNs == 0 {
  173. topicDetails.LastUpdated = time.Now()
  174. }
  175. // Process partitions
  176. for _, assignment := range configResp.BrokerPartitionAssignments {
  177. if assignment.Partition != nil {
  178. partitionInfo := PartitionInfo{
  179. ID: assignment.Partition.RangeStart,
  180. LeaderBroker: assignment.LeaderBroker,
  181. FollowerBroker: assignment.FollowerBroker,
  182. MessageCount: 0, // Will be enhanced later with actual stats
  183. TotalSize: 0, // Will be enhanced later with actual stats
  184. LastDataTime: time.Time{}, // Will be enhanced later
  185. CreatedAt: time.Now(),
  186. }
  187. topicDetails.Partitions = append(topicDetails.Partitions, partitionInfo)
  188. }
  189. }
  190. // Process schema from RecordType
  191. if configResp.RecordType != nil {
  192. topicDetails.Schema = convertRecordTypeToSchemaFields(configResp.RecordType)
  193. }
  194. // Get publishers information
  195. publishersResp, err := client.GetTopicPublishers(ctx, &mq_pb.GetTopicPublishersRequest{
  196. Topic: &schema_pb.Topic{
  197. Namespace: namespace,
  198. Name: topicName,
  199. },
  200. })
  201. if err != nil {
  202. // Log error but don't fail the entire request
  203. glog.V(0).Infof("failed to get topic publishers for %s.%s: %v", namespace, topicName, err)
  204. } else {
  205. glog.V(1).Infof("got %d publishers for topic %s.%s", len(publishersResp.Publishers), namespace, topicName)
  206. topicDetails.Publishers = convertTopicPublishers(publishersResp.Publishers)
  207. }
  208. // Get subscribers information
  209. subscribersResp, err := client.GetTopicSubscribers(ctx, &mq_pb.GetTopicSubscribersRequest{
  210. Topic: &schema_pb.Topic{
  211. Namespace: namespace,
  212. Name: topicName,
  213. },
  214. })
  215. if err != nil {
  216. // Log error but don't fail the entire request
  217. glog.V(0).Infof("failed to get topic subscribers for %s.%s: %v", namespace, topicName, err)
  218. } else {
  219. glog.V(1).Infof("got %d subscribers for topic %s.%s", len(subscribersResp.Subscribers), namespace, topicName)
  220. topicDetails.Subscribers = convertTopicSubscribers(subscribersResp.Subscribers)
  221. }
  222. return nil
  223. })
  224. if err != nil {
  225. return nil, err
  226. }
  227. // Get consumer group offsets from the filer
  228. offsets, err := s.GetConsumerGroupOffsets(namespace, topicName)
  229. if err != nil {
  230. // Log error but don't fail the entire request
  231. glog.V(0).Infof("failed to get consumer group offsets for %s.%s: %v", namespace, topicName, err)
  232. } else {
  233. glog.V(1).Infof("got %d consumer group offsets for topic %s.%s", len(offsets), namespace, topicName)
  234. topicDetails.ConsumerGroupOffsets = offsets
  235. }
  236. return topicDetails, nil
  237. }
  238. // GetConsumerGroupOffsets retrieves consumer group offsets for a topic from the filer
  239. func (s *AdminServer) GetConsumerGroupOffsets(namespace, topicName string) ([]ConsumerGroupOffsetInfo, error) {
  240. var offsets []ConsumerGroupOffsetInfo
  241. err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  242. // Get the topic directory: /topics/namespace/topicName
  243. topicObj := topic.NewTopic(namespace, topicName)
  244. topicDir := topicObj.Dir()
  245. // List all version directories under the topic directory (e.g., v2025-07-10-05-44-34)
  246. versionStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
  247. Directory: topicDir,
  248. Prefix: "",
  249. StartFromFileName: "",
  250. InclusiveStartFrom: false,
  251. Limit: 1000,
  252. })
  253. if err != nil {
  254. return fmt.Errorf("failed to list topic directory %s: %v", topicDir, err)
  255. }
  256. // Process each version directory
  257. for {
  258. versionResp, err := versionStream.Recv()
  259. if err != nil {
  260. if err == io.EOF {
  261. break
  262. }
  263. return fmt.Errorf("failed to receive version entries: %w", err)
  264. }
  265. // Only process directories that are versions (start with "v")
  266. if versionResp.Entry.IsDirectory && strings.HasPrefix(versionResp.Entry.Name, "v") {
  267. versionDir := filepath.Join(topicDir, versionResp.Entry.Name)
  268. // List all partition directories under the version directory (e.g., 0315-0630)
  269. partitionStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
  270. Directory: versionDir,
  271. Prefix: "",
  272. StartFromFileName: "",
  273. InclusiveStartFrom: false,
  274. Limit: 1000,
  275. })
  276. if err != nil {
  277. glog.Warningf("Failed to list version directory %s: %v", versionDir, err)
  278. continue
  279. }
  280. // Process each partition directory
  281. for {
  282. partitionResp, err := partitionStream.Recv()
  283. if err != nil {
  284. if err == io.EOF {
  285. break
  286. }
  287. glog.Warningf("Failed to receive partition entries: %v", err)
  288. break
  289. }
  290. // Only process directories that are partitions (format: NNNN-NNNN)
  291. if partitionResp.Entry.IsDirectory {
  292. // Parse partition range to get partition start ID (e.g., "0315-0630" -> 315)
  293. var partitionStart, partitionStop int32
  294. if n, err := fmt.Sscanf(partitionResp.Entry.Name, "%04d-%04d", &partitionStart, &partitionStop); n != 2 || err != nil {
  295. // Skip directories that don't match the partition format
  296. continue
  297. }
  298. partitionDir := filepath.Join(versionDir, partitionResp.Entry.Name)
  299. // List all .offset files in this partition directory
  300. offsetStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
  301. Directory: partitionDir,
  302. Prefix: "",
  303. StartFromFileName: "",
  304. InclusiveStartFrom: false,
  305. Limit: 1000,
  306. })
  307. if err != nil {
  308. glog.Warningf("Failed to list partition directory %s: %v", partitionDir, err)
  309. continue
  310. }
  311. // Process each offset file
  312. for {
  313. offsetResp, err := offsetStream.Recv()
  314. if err != nil {
  315. if err == io.EOF {
  316. break
  317. }
  318. glog.Warningf("Failed to receive offset entries: %v", err)
  319. break
  320. }
  321. // Only process .offset files
  322. if !offsetResp.Entry.IsDirectory && strings.HasSuffix(offsetResp.Entry.Name, ".offset") {
  323. consumerGroup := strings.TrimSuffix(offsetResp.Entry.Name, ".offset")
  324. // Read the offset value from the file
  325. offsetData, err := filer.ReadInsideFiler(client, partitionDir, offsetResp.Entry.Name)
  326. if err != nil {
  327. glog.Warningf("Failed to read offset file %s: %v", offsetResp.Entry.Name, err)
  328. continue
  329. }
  330. if len(offsetData) == 8 {
  331. offset := int64(util.BytesToUint64(offsetData))
  332. // Get the file modification time
  333. lastUpdated := time.Unix(offsetResp.Entry.Attributes.Mtime, 0)
  334. offsets = append(offsets, ConsumerGroupOffsetInfo{
  335. ConsumerGroup: consumerGroup,
  336. PartitionID: partitionStart, // Use partition start as the ID
  337. Offset: offset,
  338. LastUpdated: lastUpdated,
  339. })
  340. }
  341. }
  342. }
  343. }
  344. }
  345. }
  346. }
  347. return nil
  348. })
  349. if err != nil {
  350. return nil, fmt.Errorf("failed to get consumer group offsets: %w", err)
  351. }
  352. return offsets, nil
  353. }
  354. // convertRecordTypeToSchemaFields converts a protobuf RecordType to SchemaFieldInfo slice
  355. func convertRecordTypeToSchemaFields(recordType *schema_pb.RecordType) []SchemaFieldInfo {
  356. var schemaFields []SchemaFieldInfo
  357. if recordType == nil || recordType.Fields == nil {
  358. return schemaFields
  359. }
  360. for _, field := range recordType.Fields {
  361. schemaField := SchemaFieldInfo{
  362. Name: field.Name,
  363. Type: getFieldTypeString(field.Type),
  364. Required: field.IsRequired,
  365. }
  366. schemaFields = append(schemaFields, schemaField)
  367. }
  368. return schemaFields
  369. }
  370. // getFieldTypeString converts a protobuf Type to a human-readable string
  371. func getFieldTypeString(fieldType *schema_pb.Type) string {
  372. if fieldType == nil {
  373. return "unknown"
  374. }
  375. switch kind := fieldType.Kind.(type) {
  376. case *schema_pb.Type_ScalarType:
  377. return getScalarTypeString(kind.ScalarType)
  378. case *schema_pb.Type_RecordType:
  379. return "record"
  380. case *schema_pb.Type_ListType:
  381. elementType := getFieldTypeString(kind.ListType.ElementType)
  382. return fmt.Sprintf("list<%s>", elementType)
  383. default:
  384. return "unknown"
  385. }
  386. }
  387. // getScalarTypeString converts a protobuf ScalarType to a string
  388. func getScalarTypeString(scalarType schema_pb.ScalarType) string {
  389. switch scalarType {
  390. case schema_pb.ScalarType_BOOL:
  391. return "bool"
  392. case schema_pb.ScalarType_INT32:
  393. return "int32"
  394. case schema_pb.ScalarType_INT64:
  395. return "int64"
  396. case schema_pb.ScalarType_FLOAT:
  397. return "float"
  398. case schema_pb.ScalarType_DOUBLE:
  399. return "double"
  400. case schema_pb.ScalarType_BYTES:
  401. return "bytes"
  402. case schema_pb.ScalarType_STRING:
  403. return "string"
  404. default:
  405. return "unknown"
  406. }
  407. }
  408. // convertTopicPublishers converts protobuf TopicPublisher slice to PublisherInfo slice
  409. func convertTopicPublishers(publishers []*mq_pb.TopicPublisher) []PublisherInfo {
  410. publisherInfos := make([]PublisherInfo, 0, len(publishers))
  411. for _, publisher := range publishers {
  412. publisherInfo := PublisherInfo{
  413. PublisherName: publisher.PublisherName,
  414. ClientID: publisher.ClientId,
  415. PartitionID: publisher.Partition.RangeStart,
  416. Broker: publisher.Broker,
  417. IsActive: publisher.IsActive,
  418. LastPublishedOffset: publisher.LastPublishedOffset,
  419. LastAckedOffset: publisher.LastAckedOffset,
  420. }
  421. // Convert timestamps
  422. if publisher.ConnectTimeNs > 0 {
  423. publisherInfo.ConnectTime = time.Unix(0, publisher.ConnectTimeNs)
  424. }
  425. if publisher.LastSeenTimeNs > 0 {
  426. publisherInfo.LastSeenTime = time.Unix(0, publisher.LastSeenTimeNs)
  427. }
  428. publisherInfos = append(publisherInfos, publisherInfo)
  429. }
  430. return publisherInfos
  431. }
  432. // convertTopicSubscribers converts protobuf TopicSubscriber slice to TopicSubscriberInfo slice
  433. func convertTopicSubscribers(subscribers []*mq_pb.TopicSubscriber) []TopicSubscriberInfo {
  434. subscriberInfos := make([]TopicSubscriberInfo, 0, len(subscribers))
  435. for _, subscriber := range subscribers {
  436. subscriberInfo := TopicSubscriberInfo{
  437. ConsumerGroup: subscriber.ConsumerGroup,
  438. ConsumerID: subscriber.ConsumerId,
  439. ClientID: subscriber.ClientId,
  440. PartitionID: subscriber.Partition.RangeStart,
  441. Broker: subscriber.Broker,
  442. IsActive: subscriber.IsActive,
  443. CurrentOffset: subscriber.CurrentOffset,
  444. LastReceivedOffset: subscriber.LastReceivedOffset,
  445. }
  446. // Convert timestamps
  447. if subscriber.ConnectTimeNs > 0 {
  448. subscriberInfo.ConnectTime = time.Unix(0, subscriber.ConnectTimeNs)
  449. }
  450. if subscriber.LastSeenTimeNs > 0 {
  451. subscriberInfo.LastSeenTime = time.Unix(0, subscriber.LastSeenTimeNs)
  452. }
  453. subscriberInfos = append(subscriberInfos, subscriberInfo)
  454. }
  455. return subscriberInfos
  456. }
  457. // findBrokerLeader finds the current broker leader
  458. func (s *AdminServer) findBrokerLeader() (string, error) {
  459. // First, try to find any broker from the cluster
  460. var brokers []string
  461. err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  462. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  463. ClientType: cluster.BrokerType,
  464. })
  465. if err != nil {
  466. return err
  467. }
  468. for _, node := range resp.ClusterNodes {
  469. brokers = append(brokers, node.Address)
  470. }
  471. return nil
  472. })
  473. if err != nil {
  474. return "", fmt.Errorf("failed to list brokers: %w", err)
  475. }
  476. if len(brokers) == 0 {
  477. return "", fmt.Errorf("no brokers found in cluster")
  478. }
  479. // Try each broker to find the leader
  480. for _, brokerAddr := range brokers {
  481. err := s.withBrokerClient(brokerAddr, func(client mq_pb.SeaweedMessagingClient) error {
  482. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  483. defer cancel()
  484. // Try to find broker leader
  485. _, err := client.FindBrokerLeader(ctx, &mq_pb.FindBrokerLeaderRequest{
  486. FilerGroup: "",
  487. })
  488. if err == nil {
  489. return nil // This broker is the leader
  490. }
  491. return err
  492. })
  493. if err == nil {
  494. return brokerAddr, nil
  495. }
  496. }
  497. return "", fmt.Errorf("no broker leader found")
  498. }
  499. // withBrokerClient connects to a message queue broker and executes a function
  500. func (s *AdminServer) withBrokerClient(brokerAddress string, fn func(client mq_pb.SeaweedMessagingClient) error) error {
  501. return pb.WithBrokerGrpcClient(false, brokerAddress, s.grpcDialOption, fn)
  502. }
  503. // convertTopicRetention converts protobuf retention to TopicRetentionInfo
  504. func convertTopicRetention(retention *mq_pb.TopicRetention) TopicRetentionInfo {
  505. if retention == nil || !retention.Enabled {
  506. return TopicRetentionInfo{
  507. Enabled: false,
  508. RetentionSeconds: 0,
  509. DisplayValue: 0,
  510. DisplayUnit: "days",
  511. }
  512. }
  513. // Convert seconds to human-readable format
  514. seconds := retention.RetentionSeconds
  515. var displayValue int32
  516. var displayUnit string
  517. if seconds >= 86400 { // >= 1 day
  518. displayValue = int32(seconds / 86400)
  519. displayUnit = "days"
  520. } else if seconds >= 3600 { // >= 1 hour
  521. displayValue = int32(seconds / 3600)
  522. displayUnit = "hours"
  523. } else {
  524. displayValue = int32(seconds)
  525. displayUnit = "seconds"
  526. }
  527. return TopicRetentionInfo{
  528. Enabled: retention.Enabled,
  529. RetentionSeconds: seconds,
  530. DisplayValue: displayValue,
  531. DisplayUnit: displayUnit,
  532. }
  533. }