| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615 |
- package dash
- import (
- "context"
- "fmt"
- "io"
- "path/filepath"
- "strings"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/cluster"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
- )
- // GetTopics retrieves message queue topics data
- func (s *AdminServer) GetTopics() (*TopicsData, error) {
- var topics []TopicInfo
- // Find broker leader and get topics
- brokerLeader, err := s.findBrokerLeader()
- if err != nil {
- // If no broker leader found, return empty data
- return &TopicsData{
- Topics: topics,
- TotalTopics: len(topics),
- LastUpdated: time.Now(),
- }, nil
- }
- // Connect to broker leader and list topics
- err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- resp, err := client.ListTopics(ctx, &mq_pb.ListTopicsRequest{})
- if err != nil {
- return err
- }
- // Convert protobuf topics to TopicInfo - only include available data
- for _, pbTopic := range resp.Topics {
- topicInfo := TopicInfo{
- Name: fmt.Sprintf("%s.%s", pbTopic.Namespace, pbTopic.Name),
- Partitions: 0, // Will be populated by LookupTopicBrokers call
- Retention: TopicRetentionInfo{
- Enabled: false,
- DisplayValue: 0,
- DisplayUnit: "days",
- },
- }
- // Get topic configuration to get partition count and retention info
- lookupResp, err := client.LookupTopicBrokers(ctx, &mq_pb.LookupTopicBrokersRequest{
- Topic: pbTopic,
- })
- if err == nil {
- topicInfo.Partitions = len(lookupResp.BrokerPartitionAssignments)
- }
- // Get topic configuration for retention information
- configResp, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
- Topic: pbTopic,
- })
- if err == nil && configResp.Retention != nil {
- topicInfo.Retention = convertTopicRetention(configResp.Retention)
- }
- topics = append(topics, topicInfo)
- }
- return nil
- })
- if err != nil {
- // If connection fails, return empty data
- return &TopicsData{
- Topics: topics,
- TotalTopics: len(topics),
- LastUpdated: time.Now(),
- }, nil
- }
- return &TopicsData{
- Topics: topics,
- TotalTopics: len(topics),
- LastUpdated: time.Now(),
- // Don't include TotalMessages and TotalSize as they're not available
- }, nil
- }
- // GetSubscribers retrieves message queue subscribers data
- func (s *AdminServer) GetSubscribers() (*SubscribersData, error) {
- var subscribers []SubscriberInfo
- // Find broker leader and get subscriber info from broker stats
- brokerLeader, err := s.findBrokerLeader()
- if err != nil {
- // If no broker leader found, return empty data
- return &SubscribersData{
- Subscribers: subscribers,
- TotalSubscribers: len(subscribers),
- ActiveSubscribers: 0,
- LastUpdated: time.Now(),
- }, nil
- }
- // Connect to broker leader and get subscriber information
- // Note: SeaweedMQ doesn't have a direct API to list all subscribers
- // We would need to collect this information from broker statistics
- // For now, return empty data structure as subscriber info is not
- // directly available through the current MQ API
- err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
- // TODO: Implement subscriber data collection from broker statistics
- // This would require access to broker internal statistics about
- // active subscribers, consumer groups, etc.
- return nil
- })
- if err != nil {
- // If connection fails, return empty data
- return &SubscribersData{
- Subscribers: subscribers,
- TotalSubscribers: len(subscribers),
- ActiveSubscribers: 0,
- LastUpdated: time.Now(),
- }, nil
- }
- activeCount := 0
- for _, sub := range subscribers {
- if sub.Status == "active" {
- activeCount++
- }
- }
- return &SubscribersData{
- Subscribers: subscribers,
- TotalSubscribers: len(subscribers),
- ActiveSubscribers: activeCount,
- LastUpdated: time.Now(),
- }, nil
- }
- // GetTopicDetails retrieves detailed information about a specific topic
- func (s *AdminServer) GetTopicDetails(namespace, topicName string) (*TopicDetailsData, error) {
- // Find broker leader
- brokerLeader, err := s.findBrokerLeader()
- if err != nil {
- return nil, fmt.Errorf("failed to find broker leader: %w", err)
- }
- var topicDetails *TopicDetailsData
- // Connect to broker leader and get topic configuration
- err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- // Get topic configuration using the new API
- configResp, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
- Topic: &schema_pb.Topic{
- Namespace: namespace,
- Name: topicName,
- },
- })
- if err != nil {
- return fmt.Errorf("failed to get topic configuration: %w", err)
- }
- // Initialize topic details
- topicDetails = &TopicDetailsData{
- TopicName: fmt.Sprintf("%s.%s", namespace, topicName),
- Namespace: namespace,
- Name: topicName,
- Partitions: []PartitionInfo{},
- Schema: []SchemaFieldInfo{},
- Publishers: []PublisherInfo{},
- Subscribers: []TopicSubscriberInfo{},
- ConsumerGroupOffsets: []ConsumerGroupOffsetInfo{},
- Retention: convertTopicRetention(configResp.Retention),
- CreatedAt: time.Unix(0, configResp.CreatedAtNs),
- LastUpdated: time.Unix(0, configResp.LastUpdatedNs),
- }
- // Set current time if timestamps are not available
- if configResp.CreatedAtNs == 0 {
- topicDetails.CreatedAt = time.Now()
- }
- if configResp.LastUpdatedNs == 0 {
- topicDetails.LastUpdated = time.Now()
- }
- // Process partitions
- for _, assignment := range configResp.BrokerPartitionAssignments {
- if assignment.Partition != nil {
- partitionInfo := PartitionInfo{
- ID: assignment.Partition.RangeStart,
- LeaderBroker: assignment.LeaderBroker,
- FollowerBroker: assignment.FollowerBroker,
- MessageCount: 0, // Will be enhanced later with actual stats
- TotalSize: 0, // Will be enhanced later with actual stats
- LastDataTime: time.Time{}, // Will be enhanced later
- CreatedAt: time.Now(),
- }
- topicDetails.Partitions = append(topicDetails.Partitions, partitionInfo)
- }
- }
- // Process schema from RecordType
- if configResp.RecordType != nil {
- topicDetails.Schema = convertRecordTypeToSchemaFields(configResp.RecordType)
- }
- // Get publishers information
- publishersResp, err := client.GetTopicPublishers(ctx, &mq_pb.GetTopicPublishersRequest{
- Topic: &schema_pb.Topic{
- Namespace: namespace,
- Name: topicName,
- },
- })
- if err != nil {
- // Log error but don't fail the entire request
- glog.V(0).Infof("failed to get topic publishers for %s.%s: %v", namespace, topicName, err)
- } else {
- glog.V(1).Infof("got %d publishers for topic %s.%s", len(publishersResp.Publishers), namespace, topicName)
- topicDetails.Publishers = convertTopicPublishers(publishersResp.Publishers)
- }
- // Get subscribers information
- subscribersResp, err := client.GetTopicSubscribers(ctx, &mq_pb.GetTopicSubscribersRequest{
- Topic: &schema_pb.Topic{
- Namespace: namespace,
- Name: topicName,
- },
- })
- if err != nil {
- // Log error but don't fail the entire request
- glog.V(0).Infof("failed to get topic subscribers for %s.%s: %v", namespace, topicName, err)
- } else {
- glog.V(1).Infof("got %d subscribers for topic %s.%s", len(subscribersResp.Subscribers), namespace, topicName)
- topicDetails.Subscribers = convertTopicSubscribers(subscribersResp.Subscribers)
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- // Get consumer group offsets from the filer
- offsets, err := s.GetConsumerGroupOffsets(namespace, topicName)
- if err != nil {
- // Log error but don't fail the entire request
- glog.V(0).Infof("failed to get consumer group offsets for %s.%s: %v", namespace, topicName, err)
- } else {
- glog.V(1).Infof("got %d consumer group offsets for topic %s.%s", len(offsets), namespace, topicName)
- topicDetails.ConsumerGroupOffsets = offsets
- }
- return topicDetails, nil
- }
- // GetConsumerGroupOffsets retrieves consumer group offsets for a topic from the filer
- func (s *AdminServer) GetConsumerGroupOffsets(namespace, topicName string) ([]ConsumerGroupOffsetInfo, error) {
- var offsets []ConsumerGroupOffsetInfo
- err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- // Get the topic directory: /topics/namespace/topicName
- topicObj := topic.NewTopic(namespace, topicName)
- topicDir := topicObj.Dir()
- // List all version directories under the topic directory (e.g., v2025-07-10-05-44-34)
- versionStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
- Directory: topicDir,
- Prefix: "",
- StartFromFileName: "",
- InclusiveStartFrom: false,
- Limit: 1000,
- })
- if err != nil {
- return fmt.Errorf("failed to list topic directory %s: %v", topicDir, err)
- }
- // Process each version directory
- for {
- versionResp, err := versionStream.Recv()
- if err != nil {
- if err == io.EOF {
- break
- }
- return fmt.Errorf("failed to receive version entries: %w", err)
- }
- // Only process directories that are versions (start with "v")
- if versionResp.Entry.IsDirectory && strings.HasPrefix(versionResp.Entry.Name, "v") {
- versionDir := filepath.Join(topicDir, versionResp.Entry.Name)
- // List all partition directories under the version directory (e.g., 0315-0630)
- partitionStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
- Directory: versionDir,
- Prefix: "",
- StartFromFileName: "",
- InclusiveStartFrom: false,
- Limit: 1000,
- })
- if err != nil {
- glog.Warningf("Failed to list version directory %s: %v", versionDir, err)
- continue
- }
- // Process each partition directory
- for {
- partitionResp, err := partitionStream.Recv()
- if err != nil {
- if err == io.EOF {
- break
- }
- glog.Warningf("Failed to receive partition entries: %v", err)
- break
- }
- // Only process directories that are partitions (format: NNNN-NNNN)
- if partitionResp.Entry.IsDirectory {
- // Parse partition range to get partition start ID (e.g., "0315-0630" -> 315)
- var partitionStart, partitionStop int32
- if n, err := fmt.Sscanf(partitionResp.Entry.Name, "%04d-%04d", &partitionStart, &partitionStop); n != 2 || err != nil {
- // Skip directories that don't match the partition format
- continue
- }
- partitionDir := filepath.Join(versionDir, partitionResp.Entry.Name)
- // List all .offset files in this partition directory
- offsetStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
- Directory: partitionDir,
- Prefix: "",
- StartFromFileName: "",
- InclusiveStartFrom: false,
- Limit: 1000,
- })
- if err != nil {
- glog.Warningf("Failed to list partition directory %s: %v", partitionDir, err)
- continue
- }
- // Process each offset file
- for {
- offsetResp, err := offsetStream.Recv()
- if err != nil {
- if err == io.EOF {
- break
- }
- glog.Warningf("Failed to receive offset entries: %v", err)
- break
- }
- // Only process .offset files
- if !offsetResp.Entry.IsDirectory && strings.HasSuffix(offsetResp.Entry.Name, ".offset") {
- consumerGroup := strings.TrimSuffix(offsetResp.Entry.Name, ".offset")
- // Read the offset value from the file
- offsetData, err := filer.ReadInsideFiler(client, partitionDir, offsetResp.Entry.Name)
- if err != nil {
- glog.Warningf("Failed to read offset file %s: %v", offsetResp.Entry.Name, err)
- continue
- }
- if len(offsetData) == 8 {
- offset := int64(util.BytesToUint64(offsetData))
- // Get the file modification time
- lastUpdated := time.Unix(offsetResp.Entry.Attributes.Mtime, 0)
- offsets = append(offsets, ConsumerGroupOffsetInfo{
- ConsumerGroup: consumerGroup,
- PartitionID: partitionStart, // Use partition start as the ID
- Offset: offset,
- LastUpdated: lastUpdated,
- })
- }
- }
- }
- }
- }
- }
- }
- return nil
- })
- if err != nil {
- return nil, fmt.Errorf("failed to get consumer group offsets: %w", err)
- }
- return offsets, nil
- }
- // convertRecordTypeToSchemaFields converts a protobuf RecordType to SchemaFieldInfo slice
- func convertRecordTypeToSchemaFields(recordType *schema_pb.RecordType) []SchemaFieldInfo {
- var schemaFields []SchemaFieldInfo
- if recordType == nil || recordType.Fields == nil {
- return schemaFields
- }
- for _, field := range recordType.Fields {
- schemaField := SchemaFieldInfo{
- Name: field.Name,
- Type: getFieldTypeString(field.Type),
- Required: field.IsRequired,
- }
- schemaFields = append(schemaFields, schemaField)
- }
- return schemaFields
- }
- // getFieldTypeString converts a protobuf Type to a human-readable string
- func getFieldTypeString(fieldType *schema_pb.Type) string {
- if fieldType == nil {
- return "unknown"
- }
- switch kind := fieldType.Kind.(type) {
- case *schema_pb.Type_ScalarType:
- return getScalarTypeString(kind.ScalarType)
- case *schema_pb.Type_RecordType:
- return "record"
- case *schema_pb.Type_ListType:
- elementType := getFieldTypeString(kind.ListType.ElementType)
- return fmt.Sprintf("list<%s>", elementType)
- default:
- return "unknown"
- }
- }
- // getScalarTypeString converts a protobuf ScalarType to a string
- func getScalarTypeString(scalarType schema_pb.ScalarType) string {
- switch scalarType {
- case schema_pb.ScalarType_BOOL:
- return "bool"
- case schema_pb.ScalarType_INT32:
- return "int32"
- case schema_pb.ScalarType_INT64:
- return "int64"
- case schema_pb.ScalarType_FLOAT:
- return "float"
- case schema_pb.ScalarType_DOUBLE:
- return "double"
- case schema_pb.ScalarType_BYTES:
- return "bytes"
- case schema_pb.ScalarType_STRING:
- return "string"
- default:
- return "unknown"
- }
- }
- // convertTopicPublishers converts protobuf TopicPublisher slice to PublisherInfo slice
- func convertTopicPublishers(publishers []*mq_pb.TopicPublisher) []PublisherInfo {
- publisherInfos := make([]PublisherInfo, 0, len(publishers))
- for _, publisher := range publishers {
- publisherInfo := PublisherInfo{
- PublisherName: publisher.PublisherName,
- ClientID: publisher.ClientId,
- PartitionID: publisher.Partition.RangeStart,
- Broker: publisher.Broker,
- IsActive: publisher.IsActive,
- LastPublishedOffset: publisher.LastPublishedOffset,
- LastAckedOffset: publisher.LastAckedOffset,
- }
- // Convert timestamps
- if publisher.ConnectTimeNs > 0 {
- publisherInfo.ConnectTime = time.Unix(0, publisher.ConnectTimeNs)
- }
- if publisher.LastSeenTimeNs > 0 {
- publisherInfo.LastSeenTime = time.Unix(0, publisher.LastSeenTimeNs)
- }
- publisherInfos = append(publisherInfos, publisherInfo)
- }
- return publisherInfos
- }
- // convertTopicSubscribers converts protobuf TopicSubscriber slice to TopicSubscriberInfo slice
- func convertTopicSubscribers(subscribers []*mq_pb.TopicSubscriber) []TopicSubscriberInfo {
- subscriberInfos := make([]TopicSubscriberInfo, 0, len(subscribers))
- for _, subscriber := range subscribers {
- subscriberInfo := TopicSubscriberInfo{
- ConsumerGroup: subscriber.ConsumerGroup,
- ConsumerID: subscriber.ConsumerId,
- ClientID: subscriber.ClientId,
- PartitionID: subscriber.Partition.RangeStart,
- Broker: subscriber.Broker,
- IsActive: subscriber.IsActive,
- CurrentOffset: subscriber.CurrentOffset,
- LastReceivedOffset: subscriber.LastReceivedOffset,
- }
- // Convert timestamps
- if subscriber.ConnectTimeNs > 0 {
- subscriberInfo.ConnectTime = time.Unix(0, subscriber.ConnectTimeNs)
- }
- if subscriber.LastSeenTimeNs > 0 {
- subscriberInfo.LastSeenTime = time.Unix(0, subscriber.LastSeenTimeNs)
- }
- subscriberInfos = append(subscriberInfos, subscriberInfo)
- }
- return subscriberInfos
- }
- // findBrokerLeader finds the current broker leader
- func (s *AdminServer) findBrokerLeader() (string, error) {
- // First, try to find any broker from the cluster
- var brokers []string
- err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
- resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
- ClientType: cluster.BrokerType,
- })
- if err != nil {
- return err
- }
- for _, node := range resp.ClusterNodes {
- brokers = append(brokers, node.Address)
- }
- return nil
- })
- if err != nil {
- return "", fmt.Errorf("failed to list brokers: %w", err)
- }
- if len(brokers) == 0 {
- return "", fmt.Errorf("no brokers found in cluster")
- }
- // Try each broker to find the leader
- for _, brokerAddr := range brokers {
- err := s.withBrokerClient(brokerAddr, func(client mq_pb.SeaweedMessagingClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
- defer cancel()
- // Try to find broker leader
- _, err := client.FindBrokerLeader(ctx, &mq_pb.FindBrokerLeaderRequest{
- FilerGroup: "",
- })
- if err == nil {
- return nil // This broker is the leader
- }
- return err
- })
- if err == nil {
- return brokerAddr, nil
- }
- }
- return "", fmt.Errorf("no broker leader found")
- }
- // withBrokerClient connects to a message queue broker and executes a function
- func (s *AdminServer) withBrokerClient(brokerAddress string, fn func(client mq_pb.SeaweedMessagingClient) error) error {
- return pb.WithBrokerGrpcClient(false, brokerAddress, s.grpcDialOption, fn)
- }
- // convertTopicRetention converts protobuf retention to TopicRetentionInfo
- func convertTopicRetention(retention *mq_pb.TopicRetention) TopicRetentionInfo {
- if retention == nil || !retention.Enabled {
- return TopicRetentionInfo{
- Enabled: false,
- RetentionSeconds: 0,
- DisplayValue: 0,
- DisplayUnit: "days",
- }
- }
- // Convert seconds to human-readable format
- seconds := retention.RetentionSeconds
- var displayValue int32
- var displayUnit string
- if seconds >= 86400 { // >= 1 day
- displayValue = int32(seconds / 86400)
- displayUnit = "days"
- } else if seconds >= 3600 { // >= 1 hour
- displayValue = int32(seconds / 3600)
- displayUnit = "hours"
- } else {
- displayValue = int32(seconds)
- displayUnit = "seconds"
- }
- return TopicRetentionInfo{
- Enabled: retention.Enabled,
- RetentionSeconds: seconds,
- DisplayValue: displayValue,
- DisplayUnit: displayUnit,
- }
- }
|