topic_retention.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. package dash
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "path/filepath"
  7. "sort"
  8. "strings"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  14. )
  15. // TopicRetentionPurger handles topic data purging based on retention policies
  16. type TopicRetentionPurger struct {
  17. adminServer *AdminServer
  18. }
  19. // NewTopicRetentionPurger creates a new topic retention purger
  20. func NewTopicRetentionPurger(adminServer *AdminServer) *TopicRetentionPurger {
  21. return &TopicRetentionPurger{
  22. adminServer: adminServer,
  23. }
  24. }
  25. // PurgeExpiredTopicData purges expired topic data based on retention policies
  26. func (p *TopicRetentionPurger) PurgeExpiredTopicData() error {
  27. glog.V(1).Infof("Starting topic data purge based on retention policies")
  28. // Get all topics with retention enabled
  29. topics, err := p.getTopicsWithRetention()
  30. if err != nil {
  31. return fmt.Errorf("failed to get topics with retention: %w", err)
  32. }
  33. glog.V(1).Infof("Found %d topics with retention enabled", len(topics))
  34. // Process each topic
  35. for _, topicRetention := range topics {
  36. err := p.purgeTopicData(topicRetention)
  37. if err != nil {
  38. glog.Errorf("Failed to purge data for topic %s: %v", topicRetention.TopicName, err)
  39. continue
  40. }
  41. }
  42. glog.V(1).Infof("Completed topic data purge")
  43. return nil
  44. }
  45. // TopicRetentionConfig represents a topic with its retention configuration
  46. type TopicRetentionConfig struct {
  47. TopicName string
  48. Namespace string
  49. Name string
  50. RetentionSeconds int64
  51. }
  52. // getTopicsWithRetention retrieves all topics that have retention enabled
  53. func (p *TopicRetentionPurger) getTopicsWithRetention() ([]TopicRetentionConfig, error) {
  54. var topicsWithRetention []TopicRetentionConfig
  55. // Find broker leader to get topics
  56. brokerLeader, err := p.adminServer.findBrokerLeader()
  57. if err != nil {
  58. return nil, fmt.Errorf("failed to find broker leader: %w", err)
  59. }
  60. // Get all topics from the broker
  61. err = p.adminServer.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
  62. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  63. defer cancel()
  64. resp, err := client.ListTopics(ctx, &mq_pb.ListTopicsRequest{})
  65. if err != nil {
  66. return err
  67. }
  68. // Check each topic for retention configuration
  69. for _, pbTopic := range resp.Topics {
  70. configResp, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
  71. Topic: pbTopic,
  72. })
  73. if err != nil {
  74. glog.Warningf("Failed to get configuration for topic %s.%s: %v", pbTopic.Namespace, pbTopic.Name, err)
  75. continue
  76. }
  77. // Check if retention is enabled
  78. if configResp.Retention != nil && configResp.Retention.Enabled && configResp.Retention.RetentionSeconds > 0 {
  79. topicRetention := TopicRetentionConfig{
  80. TopicName: fmt.Sprintf("%s.%s", pbTopic.Namespace, pbTopic.Name),
  81. Namespace: pbTopic.Namespace,
  82. Name: pbTopic.Name,
  83. RetentionSeconds: configResp.Retention.RetentionSeconds,
  84. }
  85. topicsWithRetention = append(topicsWithRetention, topicRetention)
  86. }
  87. }
  88. return nil
  89. })
  90. if err != nil {
  91. return nil, err
  92. }
  93. return topicsWithRetention, nil
  94. }
  95. // purgeTopicData purges expired data for a specific topic
  96. func (p *TopicRetentionPurger) purgeTopicData(topicRetention TopicRetentionConfig) error {
  97. glog.V(1).Infof("Purging expired data for topic %s with retention %d seconds", topicRetention.TopicName, topicRetention.RetentionSeconds)
  98. // Calculate cutoff time
  99. cutoffTime := time.Now().Add(-time.Duration(topicRetention.RetentionSeconds) * time.Second)
  100. // Get topic directory
  101. topicObj := topic.NewTopic(topicRetention.Namespace, topicRetention.Name)
  102. topicDir := topicObj.Dir()
  103. var purgedDirs []string
  104. err := p.adminServer.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  105. // List all version directories under the topic directory
  106. versionStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
  107. Directory: topicDir,
  108. Prefix: "",
  109. StartFromFileName: "",
  110. InclusiveStartFrom: false,
  111. Limit: 1000,
  112. })
  113. if err != nil {
  114. return fmt.Errorf("failed to list topic directory %s: %v", topicDir, err)
  115. }
  116. var versionDirs []VersionDirInfo
  117. // Collect all version directories
  118. for {
  119. versionResp, err := versionStream.Recv()
  120. if err != nil {
  121. if err == io.EOF {
  122. break
  123. }
  124. return fmt.Errorf("failed to receive version entries: %w", err)
  125. }
  126. // Only process directories that are versions (start with "v")
  127. if versionResp.Entry.IsDirectory && strings.HasPrefix(versionResp.Entry.Name, "v") {
  128. versionTime, err := p.parseVersionTime(versionResp.Entry.Name)
  129. if err != nil {
  130. glog.Warningf("Failed to parse version time from %s: %v", versionResp.Entry.Name, err)
  131. continue
  132. }
  133. versionDirs = append(versionDirs, VersionDirInfo{
  134. Name: versionResp.Entry.Name,
  135. VersionTime: versionTime,
  136. ModTime: time.Unix(versionResp.Entry.Attributes.Mtime, 0),
  137. })
  138. }
  139. }
  140. // Sort version directories by time (oldest first)
  141. sort.Slice(versionDirs, func(i, j int) bool {
  142. return versionDirs[i].VersionTime.Before(versionDirs[j].VersionTime)
  143. })
  144. // Keep at least the most recent version directory, even if it's expired
  145. if len(versionDirs) <= 1 {
  146. glog.V(1).Infof("Topic %s has %d version directories, keeping all", topicRetention.TopicName, len(versionDirs))
  147. return nil
  148. }
  149. // Purge expired directories (keep the most recent one)
  150. for i := 0; i < len(versionDirs)-1; i++ {
  151. versionDir := versionDirs[i]
  152. // Check if this version directory is expired
  153. if versionDir.VersionTime.Before(cutoffTime) {
  154. dirPath := filepath.Join(topicDir, versionDir.Name)
  155. // Delete the entire version directory
  156. err := p.deleteDirectoryRecursively(client, dirPath)
  157. if err != nil {
  158. glog.Errorf("Failed to delete expired directory %s: %v", dirPath, err)
  159. } else {
  160. purgedDirs = append(purgedDirs, dirPath)
  161. glog.V(1).Infof("Purged expired directory: %s (created: %s)", dirPath, versionDir.VersionTime.Format("2006-01-02 15:04:05"))
  162. }
  163. }
  164. }
  165. return nil
  166. })
  167. if err != nil {
  168. return err
  169. }
  170. if len(purgedDirs) > 0 {
  171. glog.V(0).Infof("Purged %d expired directories for topic %s", len(purgedDirs), topicRetention.TopicName)
  172. }
  173. return nil
  174. }
  175. // VersionDirInfo represents a version directory with its timestamp
  176. type VersionDirInfo struct {
  177. Name string
  178. VersionTime time.Time
  179. ModTime time.Time
  180. }
  181. // parseVersionTime parses the version directory name to extract the timestamp
  182. // Version format: v2025-01-10-05-44-34
  183. func (p *TopicRetentionPurger) parseVersionTime(versionName string) (time.Time, error) {
  184. // Remove the 'v' prefix
  185. if !strings.HasPrefix(versionName, "v") {
  186. return time.Time{}, fmt.Errorf("invalid version format: %s", versionName)
  187. }
  188. timeStr := versionName[1:] // Remove 'v'
  189. // Parse the time format: 2025-01-10-05-44-34
  190. versionTime, err := time.Parse("2006-01-02-15-04-05", timeStr)
  191. if err != nil {
  192. return time.Time{}, fmt.Errorf("failed to parse version time %s: %v", timeStr, err)
  193. }
  194. return versionTime, nil
  195. }
  196. // deleteDirectoryRecursively deletes a directory and all its contents
  197. func (p *TopicRetentionPurger) deleteDirectoryRecursively(client filer_pb.SeaweedFilerClient, dirPath string) error {
  198. // List all entries in the directory
  199. stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
  200. Directory: dirPath,
  201. Prefix: "",
  202. StartFromFileName: "",
  203. InclusiveStartFrom: false,
  204. Limit: 1000,
  205. })
  206. if err != nil {
  207. return fmt.Errorf("failed to list directory %s: %v", dirPath, err)
  208. }
  209. // Delete all entries
  210. for {
  211. resp, err := stream.Recv()
  212. if err != nil {
  213. if err == io.EOF {
  214. break
  215. }
  216. return fmt.Errorf("failed to receive entries: %w", err)
  217. }
  218. entryPath := filepath.Join(dirPath, resp.Entry.Name)
  219. if resp.Entry.IsDirectory {
  220. // Recursively delete subdirectory
  221. err = p.deleteDirectoryRecursively(client, entryPath)
  222. if err != nil {
  223. return fmt.Errorf("failed to delete subdirectory %s: %v", entryPath, err)
  224. }
  225. } else {
  226. // Delete file
  227. _, err = client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
  228. Directory: dirPath,
  229. Name: resp.Entry.Name,
  230. })
  231. if err != nil {
  232. return fmt.Errorf("failed to delete file %s: %v", entryPath, err)
  233. }
  234. }
  235. }
  236. // Delete the directory itself
  237. parentDir := filepath.Dir(dirPath)
  238. dirName := filepath.Base(dirPath)
  239. _, err = client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
  240. Directory: parentDir,
  241. Name: dirName,
  242. })
  243. if err != nil {
  244. return fmt.Errorf("failed to delete directory %s: %v", dirPath, err)
  245. }
  246. return nil
  247. }