topic.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package topic
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "strings"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/filer"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/util"
  14. jsonpb "google.golang.org/protobuf/encoding/protojson"
  15. )
  16. type Topic struct {
  17. Namespace string
  18. Name string
  19. }
  20. func NewTopic(namespace string, name string) Topic {
  21. return Topic{
  22. Namespace: namespace,
  23. Name: name,
  24. }
  25. }
  26. func FromPbTopic(topic *schema_pb.Topic) Topic {
  27. return Topic{
  28. Namespace: topic.Namespace,
  29. Name: topic.Name,
  30. }
  31. }
  32. func (t Topic) ToPbTopic() *schema_pb.Topic {
  33. return &schema_pb.Topic{
  34. Namespace: t.Namespace,
  35. Name: t.Name,
  36. }
  37. }
  38. func (t Topic) String() string {
  39. return fmt.Sprintf("%s.%s", t.Namespace, t.Name)
  40. }
  41. func (t Topic) Dir() string {
  42. return fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  43. }
  44. func (t Topic) ReadConfFile(client filer_pb.SeaweedFilerClient) (*mq_pb.ConfigureTopicResponse, error) {
  45. data, err := filer.ReadInsideFiler(client, t.Dir(), filer.TopicConfFile)
  46. if errors.Is(err, filer_pb.ErrNotFound) {
  47. return nil, err
  48. }
  49. if err != nil {
  50. return nil, fmt.Errorf("read topic.conf of %v: %w", t, err)
  51. }
  52. // parse into filer conf object
  53. conf := &mq_pb.ConfigureTopicResponse{}
  54. if err = jsonpb.Unmarshal(data, conf); err != nil {
  55. return nil, fmt.Errorf("unmarshal topic %v conf: %w", t, err)
  56. }
  57. return conf, nil
  58. }
  59. // ReadConfFileWithMetadata reads the topic configuration and returns it along with file metadata
  60. func (t Topic) ReadConfFileWithMetadata(client filer_pb.SeaweedFilerClient) (*mq_pb.ConfigureTopicResponse, int64, int64, error) {
  61. // Use LookupDirectoryEntry to get both content and metadata
  62. request := &filer_pb.LookupDirectoryEntryRequest{
  63. Directory: t.Dir(),
  64. Name: filer.TopicConfFile,
  65. }
  66. resp, err := filer_pb.LookupEntry(context.Background(), client, request)
  67. if err != nil {
  68. if errors.Is(err, filer_pb.ErrNotFound) {
  69. return nil, 0, 0, err
  70. }
  71. return nil, 0, 0, fmt.Errorf("lookup topic.conf of %v: %w", t, err)
  72. }
  73. // Get file metadata
  74. var createdAtNs, modifiedAtNs int64
  75. if resp.Entry.Attributes != nil {
  76. createdAtNs = resp.Entry.Attributes.Crtime * 1e9 // convert seconds to nanoseconds
  77. modifiedAtNs = resp.Entry.Attributes.Mtime * 1e9 // convert seconds to nanoseconds
  78. }
  79. // Parse the configuration
  80. conf := &mq_pb.ConfigureTopicResponse{}
  81. if err = jsonpb.Unmarshal(resp.Entry.Content, conf); err != nil {
  82. return nil, 0, 0, fmt.Errorf("unmarshal topic %v conf: %w", t, err)
  83. }
  84. return conf, createdAtNs, modifiedAtNs, nil
  85. }
  86. func (t Topic) WriteConfFile(client filer_pb.SeaweedFilerClient, conf *mq_pb.ConfigureTopicResponse) error {
  87. var buf bytes.Buffer
  88. filer.ProtoToText(&buf, conf)
  89. if err := filer.SaveInsideFiler(client, t.Dir(), filer.TopicConfFile, buf.Bytes()); err != nil {
  90. return fmt.Errorf("save topic %v conf: %w", t, err)
  91. }
  92. return nil
  93. }
  94. // DiscoverPartitions discovers all partition directories for a topic by scanning the filesystem
  95. // This centralizes partition discovery logic used across query engine, shell commands, etc.
  96. func (t Topic) DiscoverPartitions(ctx context.Context, filerClient filer_pb.FilerClient) ([]string, error) {
  97. var partitionPaths []string
  98. // Scan the topic directory for version directories (e.g., v2025-09-01-07-16-34)
  99. err := filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(t.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error {
  100. if !versionEntry.IsDirectory {
  101. return nil // Skip non-directories
  102. }
  103. // Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34")
  104. if !IsValidVersionDirectory(versionEntry.Name) {
  105. // Skip directories that don't match the version format
  106. return nil
  107. }
  108. // Scan partition directories within this version (e.g., 0000-0630)
  109. versionDir := fmt.Sprintf("%s/%s", t.Dir(), versionEntry.Name)
  110. return filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error {
  111. if !partitionEntry.IsDirectory {
  112. return nil // Skip non-directories
  113. }
  114. // Parse partition boundary from directory name (e.g., "0000-0630")
  115. if !IsValidPartitionDirectory(partitionEntry.Name) {
  116. return nil // Skip invalid partition names
  117. }
  118. // Add this partition path to the list
  119. partitionPath := fmt.Sprintf("%s/%s", versionDir, partitionEntry.Name)
  120. partitionPaths = append(partitionPaths, partitionPath)
  121. return nil
  122. })
  123. })
  124. return partitionPaths, err
  125. }
  126. // IsValidVersionDirectory checks if a directory name matches the topic version format
  127. // Format: v2025-09-01-07-16-34
  128. func IsValidVersionDirectory(name string) bool {
  129. if !strings.HasPrefix(name, "v") || len(name) != 20 {
  130. return false
  131. }
  132. // Try to parse the timestamp part
  133. timestampStr := name[1:] // Remove 'v' prefix
  134. _, err := time.Parse("2006-01-02-15-04-05", timestampStr)
  135. return err == nil
  136. }
  137. // IsValidPartitionDirectory checks if a directory name matches the partition boundary format
  138. // Format: 0000-0630 (rangeStart-rangeStop)
  139. func IsValidPartitionDirectory(name string) bool {
  140. // Use existing ParsePartitionBoundary function to validate
  141. start, stop := ParsePartitionBoundary(name)
  142. // Valid partition ranges should have start < stop (and not both be 0, which indicates parse error)
  143. return start < stop && start >= 0
  144. }