| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- package dash
- import (
- "context"
- "fmt"
- "io"
- "path/filepath"
- "sort"
- "strings"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- )
- // TopicRetentionPurger handles topic data purging based on retention policies
- type TopicRetentionPurger struct {
- adminServer *AdminServer
- }
- // NewTopicRetentionPurger creates a new topic retention purger
- func NewTopicRetentionPurger(adminServer *AdminServer) *TopicRetentionPurger {
- return &TopicRetentionPurger{
- adminServer: adminServer,
- }
- }
- // PurgeExpiredTopicData purges expired topic data based on retention policies
- func (p *TopicRetentionPurger) PurgeExpiredTopicData() error {
- glog.V(1).Infof("Starting topic data purge based on retention policies")
- // Get all topics with retention enabled
- topics, err := p.getTopicsWithRetention()
- if err != nil {
- return fmt.Errorf("failed to get topics with retention: %w", err)
- }
- glog.V(1).Infof("Found %d topics with retention enabled", len(topics))
- // Process each topic
- for _, topicRetention := range topics {
- err := p.purgeTopicData(topicRetention)
- if err != nil {
- glog.Errorf("Failed to purge data for topic %s: %v", topicRetention.TopicName, err)
- continue
- }
- }
- glog.V(1).Infof("Completed topic data purge")
- return nil
- }
- // TopicRetentionConfig represents a topic with its retention configuration
- type TopicRetentionConfig struct {
- TopicName string
- Namespace string
- Name string
- RetentionSeconds int64
- }
- // getTopicsWithRetention retrieves all topics that have retention enabled
- func (p *TopicRetentionPurger) getTopicsWithRetention() ([]TopicRetentionConfig, error) {
- var topicsWithRetention []TopicRetentionConfig
- // Find broker leader to get topics
- brokerLeader, err := p.adminServer.findBrokerLeader()
- if err != nil {
- return nil, fmt.Errorf("failed to find broker leader: %w", err)
- }
- // Get all topics from the broker
- err = p.adminServer.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- resp, err := client.ListTopics(ctx, &mq_pb.ListTopicsRequest{})
- if err != nil {
- return err
- }
- // Check each topic for retention configuration
- for _, pbTopic := range resp.Topics {
- configResp, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
- Topic: pbTopic,
- })
- if err != nil {
- glog.Warningf("Failed to get configuration for topic %s.%s: %v", pbTopic.Namespace, pbTopic.Name, err)
- continue
- }
- // Check if retention is enabled
- if configResp.Retention != nil && configResp.Retention.Enabled && configResp.Retention.RetentionSeconds > 0 {
- topicRetention := TopicRetentionConfig{
- TopicName: fmt.Sprintf("%s.%s", pbTopic.Namespace, pbTopic.Name),
- Namespace: pbTopic.Namespace,
- Name: pbTopic.Name,
- RetentionSeconds: configResp.Retention.RetentionSeconds,
- }
- topicsWithRetention = append(topicsWithRetention, topicRetention)
- }
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- return topicsWithRetention, nil
- }
- // purgeTopicData purges expired data for a specific topic
- func (p *TopicRetentionPurger) purgeTopicData(topicRetention TopicRetentionConfig) error {
- glog.V(1).Infof("Purging expired data for topic %s with retention %d seconds", topicRetention.TopicName, topicRetention.RetentionSeconds)
- // Calculate cutoff time
- cutoffTime := time.Now().Add(-time.Duration(topicRetention.RetentionSeconds) * time.Second)
- // Get topic directory
- topicObj := topic.NewTopic(topicRetention.Namespace, topicRetention.Name)
- topicDir := topicObj.Dir()
- var purgedDirs []string
- err := p.adminServer.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- // List all version directories under the topic directory
- versionStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
- Directory: topicDir,
- Prefix: "",
- StartFromFileName: "",
- InclusiveStartFrom: false,
- Limit: 1000,
- })
- if err != nil {
- return fmt.Errorf("failed to list topic directory %s: %v", topicDir, err)
- }
- var versionDirs []VersionDirInfo
- // Collect all version directories
- for {
- versionResp, err := versionStream.Recv()
- if err != nil {
- if err == io.EOF {
- break
- }
- return fmt.Errorf("failed to receive version entries: %w", err)
- }
- // Only process directories that are versions (start with "v")
- if versionResp.Entry.IsDirectory && strings.HasPrefix(versionResp.Entry.Name, "v") {
- versionTime, err := p.parseVersionTime(versionResp.Entry.Name)
- if err != nil {
- glog.Warningf("Failed to parse version time from %s: %v", versionResp.Entry.Name, err)
- continue
- }
- versionDirs = append(versionDirs, VersionDirInfo{
- Name: versionResp.Entry.Name,
- VersionTime: versionTime,
- ModTime: time.Unix(versionResp.Entry.Attributes.Mtime, 0),
- })
- }
- }
- // Sort version directories by time (oldest first)
- sort.Slice(versionDirs, func(i, j int) bool {
- return versionDirs[i].VersionTime.Before(versionDirs[j].VersionTime)
- })
- // Keep at least the most recent version directory, even if it's expired
- if len(versionDirs) <= 1 {
- glog.V(1).Infof("Topic %s has %d version directories, keeping all", topicRetention.TopicName, len(versionDirs))
- return nil
- }
- // Purge expired directories (keep the most recent one)
- for i := 0; i < len(versionDirs)-1; i++ {
- versionDir := versionDirs[i]
- // Check if this version directory is expired
- if versionDir.VersionTime.Before(cutoffTime) {
- dirPath := filepath.Join(topicDir, versionDir.Name)
- // Delete the entire version directory
- err := p.deleteDirectoryRecursively(client, dirPath)
- if err != nil {
- glog.Errorf("Failed to delete expired directory %s: %v", dirPath, err)
- } else {
- purgedDirs = append(purgedDirs, dirPath)
- glog.V(1).Infof("Purged expired directory: %s (created: %s)", dirPath, versionDir.VersionTime.Format("2006-01-02 15:04:05"))
- }
- }
- }
- return nil
- })
- if err != nil {
- return err
- }
- if len(purgedDirs) > 0 {
- glog.V(0).Infof("Purged %d expired directories for topic %s", len(purgedDirs), topicRetention.TopicName)
- }
- return nil
- }
- // VersionDirInfo represents a version directory with its timestamp
- type VersionDirInfo struct {
- Name string
- VersionTime time.Time
- ModTime time.Time
- }
- // parseVersionTime parses the version directory name to extract the timestamp
- // Version format: v2025-01-10-05-44-34
- func (p *TopicRetentionPurger) parseVersionTime(versionName string) (time.Time, error) {
- // Remove the 'v' prefix
- if !strings.HasPrefix(versionName, "v") {
- return time.Time{}, fmt.Errorf("invalid version format: %s", versionName)
- }
- timeStr := versionName[1:] // Remove 'v'
- // Parse the time format: 2025-01-10-05-44-34
- versionTime, err := time.Parse("2006-01-02-15-04-05", timeStr)
- if err != nil {
- return time.Time{}, fmt.Errorf("failed to parse version time %s: %v", timeStr, err)
- }
- return versionTime, nil
- }
- // deleteDirectoryRecursively deletes a directory and all its contents
- func (p *TopicRetentionPurger) deleteDirectoryRecursively(client filer_pb.SeaweedFilerClient, dirPath string) error {
- // List all entries in the directory
- stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
- Directory: dirPath,
- Prefix: "",
- StartFromFileName: "",
- InclusiveStartFrom: false,
- Limit: 1000,
- })
- if err != nil {
- return fmt.Errorf("failed to list directory %s: %v", dirPath, err)
- }
- // Delete all entries
- for {
- resp, err := stream.Recv()
- if err != nil {
- if err == io.EOF {
- break
- }
- return fmt.Errorf("failed to receive entries: %w", err)
- }
- entryPath := filepath.Join(dirPath, resp.Entry.Name)
- if resp.Entry.IsDirectory {
- // Recursively delete subdirectory
- err = p.deleteDirectoryRecursively(client, entryPath)
- if err != nil {
- return fmt.Errorf("failed to delete subdirectory %s: %v", entryPath, err)
- }
- } else {
- // Delete file
- _, err = client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
- Directory: dirPath,
- Name: resp.Entry.Name,
- })
- if err != nil {
- return fmt.Errorf("failed to delete file %s: %v", entryPath, err)
- }
- }
- }
- // Delete the directory itself
- parentDir := filepath.Dir(dirPath)
- dirName := filepath.Base(dirPath)
- _, err = client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
- Directory: parentDir,
- Name: dirName,
- })
- if err != nil {
- return fmt.Errorf("failed to delete directory %s: %v", dirPath, err)
- }
- return nil
- }
|