| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544 |
- package integration
- import (
- "context"
- "encoding/json"
- "fmt"
- "strings"
- "sync"
- "time"
- "github.com/karlseguin/ccache/v2"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/iam/policy"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "google.golang.org/grpc"
- )
- // RoleStore defines the interface for storing IAM role definitions
- type RoleStore interface {
- // StoreRole stores a role definition (filerAddress ignored for memory stores)
- StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error
- // GetRole retrieves a role definition (filerAddress ignored for memory stores)
- GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error)
- // ListRoles lists all role names (filerAddress ignored for memory stores)
- ListRoles(ctx context.Context, filerAddress string) ([]string, error)
- // DeleteRole deletes a role definition (filerAddress ignored for memory stores)
- DeleteRole(ctx context.Context, filerAddress string, roleName string) error
- }
- // MemoryRoleStore implements RoleStore using in-memory storage
- type MemoryRoleStore struct {
- roles map[string]*RoleDefinition
- mutex sync.RWMutex
- }
- // NewMemoryRoleStore creates a new memory-based role store
- func NewMemoryRoleStore() *MemoryRoleStore {
- return &MemoryRoleStore{
- roles: make(map[string]*RoleDefinition),
- }
- }
- // StoreRole stores a role definition in memory (filerAddress ignored for memory store)
- func (m *MemoryRoleStore) StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error {
- if roleName == "" {
- return fmt.Errorf("role name cannot be empty")
- }
- if role == nil {
- return fmt.Errorf("role cannot be nil")
- }
- m.mutex.Lock()
- defer m.mutex.Unlock()
- // Deep copy the role to prevent external modifications
- m.roles[roleName] = copyRoleDefinition(role)
- return nil
- }
- // GetRole retrieves a role definition from memory (filerAddress ignored for memory store)
- func (m *MemoryRoleStore) GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error) {
- if roleName == "" {
- return nil, fmt.Errorf("role name cannot be empty")
- }
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- role, exists := m.roles[roleName]
- if !exists {
- return nil, fmt.Errorf("role not found: %s", roleName)
- }
- // Return a copy to prevent external modifications
- return copyRoleDefinition(role), nil
- }
- // ListRoles lists all role names in memory (filerAddress ignored for memory store)
- func (m *MemoryRoleStore) ListRoles(ctx context.Context, filerAddress string) ([]string, error) {
- m.mutex.RLock()
- defer m.mutex.RUnlock()
- names := make([]string, 0, len(m.roles))
- for name := range m.roles {
- names = append(names, name)
- }
- return names, nil
- }
- // DeleteRole deletes a role definition from memory (filerAddress ignored for memory store)
- func (m *MemoryRoleStore) DeleteRole(ctx context.Context, filerAddress string, roleName string) error {
- if roleName == "" {
- return fmt.Errorf("role name cannot be empty")
- }
- m.mutex.Lock()
- defer m.mutex.Unlock()
- delete(m.roles, roleName)
- return nil
- }
- // copyRoleDefinition creates a deep copy of a role definition
- func copyRoleDefinition(original *RoleDefinition) *RoleDefinition {
- if original == nil {
- return nil
- }
- copied := &RoleDefinition{
- RoleName: original.RoleName,
- RoleArn: original.RoleArn,
- Description: original.Description,
- }
- // Deep copy trust policy if it exists
- if original.TrustPolicy != nil {
- // Use JSON marshaling for deep copy of the complex policy structure
- trustPolicyData, _ := json.Marshal(original.TrustPolicy)
- var trustPolicyCopy policy.PolicyDocument
- json.Unmarshal(trustPolicyData, &trustPolicyCopy)
- copied.TrustPolicy = &trustPolicyCopy
- }
- // Copy attached policies slice
- if original.AttachedPolicies != nil {
- copied.AttachedPolicies = make([]string, len(original.AttachedPolicies))
- copy(copied.AttachedPolicies, original.AttachedPolicies)
- }
- return copied
- }
- // FilerRoleStore implements RoleStore using SeaweedFS filer
- type FilerRoleStore struct {
- grpcDialOption grpc.DialOption
- basePath string
- filerAddressProvider func() string
- }
- // NewFilerRoleStore creates a new filer-based role store
- func NewFilerRoleStore(config map[string]interface{}, filerAddressProvider func() string) (*FilerRoleStore, error) {
- store := &FilerRoleStore{
- basePath: "/etc/iam/roles", // Default path for role storage - aligned with /etc/ convention
- filerAddressProvider: filerAddressProvider,
- }
- // Parse configuration - only basePath and other settings, NOT filerAddress
- if config != nil {
- if basePath, ok := config["basePath"].(string); ok && basePath != "" {
- store.basePath = strings.TrimSuffix(basePath, "/")
- }
- }
- glog.V(2).Infof("Initialized FilerRoleStore with basePath %s", store.basePath)
- return store, nil
- }
- // StoreRole stores a role definition in filer
- func (f *FilerRoleStore) StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error {
- // Use provider function if filerAddress is not provided
- if filerAddress == "" && f.filerAddressProvider != nil {
- filerAddress = f.filerAddressProvider()
- }
- if filerAddress == "" {
- return fmt.Errorf("filer address is required for FilerRoleStore")
- }
- if roleName == "" {
- return fmt.Errorf("role name cannot be empty")
- }
- if role == nil {
- return fmt.Errorf("role cannot be nil")
- }
- // Serialize role to JSON
- roleData, err := json.MarshalIndent(role, "", " ")
- if err != nil {
- return fmt.Errorf("failed to serialize role: %v", err)
- }
- rolePath := f.getRolePath(roleName)
- // Store in filer
- return f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
- request := &filer_pb.CreateEntryRequest{
- Directory: f.basePath,
- Entry: &filer_pb.Entry{
- Name: f.getRoleFileName(roleName),
- IsDirectory: false,
- Attributes: &filer_pb.FuseAttributes{
- Mtime: time.Now().Unix(),
- Crtime: time.Now().Unix(),
- FileMode: uint32(0600), // Read/write for owner only
- Uid: uint32(0),
- Gid: uint32(0),
- },
- Content: roleData,
- },
- }
- glog.V(3).Infof("Storing role %s at %s", roleName, rolePath)
- _, err := client.CreateEntry(ctx, request)
- if err != nil {
- return fmt.Errorf("failed to store role %s: %v", roleName, err)
- }
- return nil
- })
- }
- // GetRole retrieves a role definition from filer
- func (f *FilerRoleStore) GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error) {
- // Use provider function if filerAddress is not provided
- if filerAddress == "" && f.filerAddressProvider != nil {
- filerAddress = f.filerAddressProvider()
- }
- if filerAddress == "" {
- return nil, fmt.Errorf("filer address is required for FilerRoleStore")
- }
- if roleName == "" {
- return nil, fmt.Errorf("role name cannot be empty")
- }
- var roleData []byte
- err := f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
- request := &filer_pb.LookupDirectoryEntryRequest{
- Directory: f.basePath,
- Name: f.getRoleFileName(roleName),
- }
- glog.V(3).Infof("Looking up role %s", roleName)
- response, err := client.LookupDirectoryEntry(ctx, request)
- if err != nil {
- return fmt.Errorf("role not found: %v", err)
- }
- if response.Entry == nil {
- return fmt.Errorf("role not found")
- }
- roleData = response.Entry.Content
- return nil
- })
- if err != nil {
- return nil, err
- }
- // Deserialize role from JSON
- var role RoleDefinition
- if err := json.Unmarshal(roleData, &role); err != nil {
- return nil, fmt.Errorf("failed to deserialize role: %v", err)
- }
- return &role, nil
- }
- // ListRoles lists all role names in filer
- func (f *FilerRoleStore) ListRoles(ctx context.Context, filerAddress string) ([]string, error) {
- // Use provider function if filerAddress is not provided
- if filerAddress == "" && f.filerAddressProvider != nil {
- filerAddress = f.filerAddressProvider()
- }
- if filerAddress == "" {
- return nil, fmt.Errorf("filer address is required for FilerRoleStore")
- }
- var roleNames []string
- err := f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
- request := &filer_pb.ListEntriesRequest{
- Directory: f.basePath,
- Prefix: "",
- StartFromFileName: "",
- InclusiveStartFrom: false,
- Limit: 1000, // Process in batches of 1000
- }
- glog.V(3).Infof("Listing roles in %s", f.basePath)
- stream, err := client.ListEntries(ctx, request)
- if err != nil {
- return fmt.Errorf("failed to list roles: %v", err)
- }
- for {
- resp, err := stream.Recv()
- if err != nil {
- break // End of stream or error
- }
- if resp.Entry == nil || resp.Entry.IsDirectory {
- continue
- }
- // Extract role name from filename
- filename := resp.Entry.Name
- if strings.HasSuffix(filename, ".json") {
- roleName := strings.TrimSuffix(filename, ".json")
- roleNames = append(roleNames, roleName)
- }
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- return roleNames, nil
- }
- // DeleteRole deletes a role definition from filer
- func (f *FilerRoleStore) DeleteRole(ctx context.Context, filerAddress string, roleName string) error {
- // Use provider function if filerAddress is not provided
- if filerAddress == "" && f.filerAddressProvider != nil {
- filerAddress = f.filerAddressProvider()
- }
- if filerAddress == "" {
- return fmt.Errorf("filer address is required for FilerRoleStore")
- }
- if roleName == "" {
- return fmt.Errorf("role name cannot be empty")
- }
- return f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
- request := &filer_pb.DeleteEntryRequest{
- Directory: f.basePath,
- Name: f.getRoleFileName(roleName),
- IsDeleteData: true,
- }
- glog.V(3).Infof("Deleting role %s", roleName)
- resp, err := client.DeleteEntry(ctx, request)
- if err != nil {
- if strings.Contains(err.Error(), "not found") {
- return nil // Idempotent: deletion of non-existent role is successful
- }
- return fmt.Errorf("failed to delete role %s: %v", roleName, err)
- }
- if resp.Error != "" {
- if strings.Contains(resp.Error, "not found") {
- return nil // Idempotent: deletion of non-existent role is successful
- }
- return fmt.Errorf("failed to delete role %s: %s", roleName, resp.Error)
- }
- return nil
- })
- }
- // Helper methods for FilerRoleStore
- func (f *FilerRoleStore) getRoleFileName(roleName string) string {
- return roleName + ".json"
- }
- func (f *FilerRoleStore) getRolePath(roleName string) string {
- return f.basePath + "/" + f.getRoleFileName(roleName)
- }
- func (f *FilerRoleStore) withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
- if filerAddress == "" {
- return fmt.Errorf("filer address is required for FilerRoleStore")
- }
- return pb.WithGrpcFilerClient(false, 0, pb.ServerAddress(filerAddress), f.grpcDialOption, fn)
- }
- // CachedFilerRoleStore implements RoleStore with TTL caching on top of FilerRoleStore
- type CachedFilerRoleStore struct {
- filerStore *FilerRoleStore
- cache *ccache.Cache
- listCache *ccache.Cache
- ttl time.Duration
- listTTL time.Duration
- }
- // CachedFilerRoleStoreConfig holds configuration for the cached role store
- type CachedFilerRoleStoreConfig struct {
- BasePath string `json:"basePath,omitempty"`
- TTL string `json:"ttl,omitempty"` // e.g., "5m", "1h"
- ListTTL string `json:"listTtl,omitempty"` // e.g., "1m", "30s"
- MaxCacheSize int `json:"maxCacheSize,omitempty"` // Maximum number of cached roles
- }
- // NewCachedFilerRoleStore creates a new cached filer-based role store
- func NewCachedFilerRoleStore(config map[string]interface{}) (*CachedFilerRoleStore, error) {
- // Create underlying filer store
- filerStore, err := NewFilerRoleStore(config, nil)
- if err != nil {
- return nil, fmt.Errorf("failed to create filer role store: %w", err)
- }
- // Parse cache configuration with defaults
- cacheTTL := 5 * time.Minute // Default 5 minutes for role cache
- listTTL := 1 * time.Minute // Default 1 minute for list cache
- maxCacheSize := 1000 // Default max 1000 cached roles
- if config != nil {
- if ttlStr, ok := config["ttl"].(string); ok && ttlStr != "" {
- if parsed, err := time.ParseDuration(ttlStr); err == nil {
- cacheTTL = parsed
- }
- }
- if listTTLStr, ok := config["listTtl"].(string); ok && listTTLStr != "" {
- if parsed, err := time.ParseDuration(listTTLStr); err == nil {
- listTTL = parsed
- }
- }
- if maxSize, ok := config["maxCacheSize"].(int); ok && maxSize > 0 {
- maxCacheSize = maxSize
- }
- }
- // Create ccache instances with appropriate configurations
- pruneCount := int64(maxCacheSize) >> 3
- if pruneCount <= 0 {
- pruneCount = 100
- }
- store := &CachedFilerRoleStore{
- filerStore: filerStore,
- cache: ccache.New(ccache.Configure().MaxSize(int64(maxCacheSize)).ItemsToPrune(uint32(pruneCount))),
- listCache: ccache.New(ccache.Configure().MaxSize(100).ItemsToPrune(10)), // Smaller cache for lists
- ttl: cacheTTL,
- listTTL: listTTL,
- }
- glog.V(2).Infof("Initialized CachedFilerRoleStore with TTL %v, List TTL %v, Max Cache Size %d",
- cacheTTL, listTTL, maxCacheSize)
- return store, nil
- }
- // StoreRole stores a role definition and invalidates the cache
- func (c *CachedFilerRoleStore) StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error {
- // Store in filer
- err := c.filerStore.StoreRole(ctx, filerAddress, roleName, role)
- if err != nil {
- return err
- }
- // Invalidate cache entries
- c.cache.Delete(roleName)
- c.listCache.Clear() // Invalidate list cache
- glog.V(3).Infof("Stored and invalidated cache for role %s", roleName)
- return nil
- }
- // GetRole retrieves a role definition with caching
- func (c *CachedFilerRoleStore) GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error) {
- // Try to get from cache first
- item := c.cache.Get(roleName)
- if item != nil {
- // Cache hit - return cached role (DO NOT extend TTL)
- role := item.Value().(*RoleDefinition)
- glog.V(4).Infof("Cache hit for role %s", roleName)
- return copyRoleDefinition(role), nil
- }
- // Cache miss - fetch from filer
- glog.V(4).Infof("Cache miss for role %s, fetching from filer", roleName)
- role, err := c.filerStore.GetRole(ctx, filerAddress, roleName)
- if err != nil {
- return nil, err
- }
- // Cache the result with TTL
- c.cache.Set(roleName, copyRoleDefinition(role), c.ttl)
- glog.V(3).Infof("Cached role %s with TTL %v", roleName, c.ttl)
- return role, nil
- }
- // ListRoles lists all role names with caching
- func (c *CachedFilerRoleStore) ListRoles(ctx context.Context, filerAddress string) ([]string, error) {
- // Use a constant key for the role list cache
- const listCacheKey = "role_list"
- // Try to get from list cache first
- item := c.listCache.Get(listCacheKey)
- if item != nil {
- // Cache hit - return cached list (DO NOT extend TTL)
- roles := item.Value().([]string)
- glog.V(4).Infof("List cache hit, returning %d roles", len(roles))
- return append([]string(nil), roles...), nil // Return a copy
- }
- // Cache miss - fetch from filer
- glog.V(4).Infof("List cache miss, fetching from filer")
- roles, err := c.filerStore.ListRoles(ctx, filerAddress)
- if err != nil {
- return nil, err
- }
- // Cache the result with TTL (store a copy)
- rolesCopy := append([]string(nil), roles...)
- c.listCache.Set(listCacheKey, rolesCopy, c.listTTL)
- glog.V(3).Infof("Cached role list with %d entries, TTL %v", len(roles), c.listTTL)
- return roles, nil
- }
- // DeleteRole deletes a role definition and invalidates the cache
- func (c *CachedFilerRoleStore) DeleteRole(ctx context.Context, filerAddress string, roleName string) error {
- // Delete from filer
- err := c.filerStore.DeleteRole(ctx, filerAddress, roleName)
- if err != nil {
- return err
- }
- // Invalidate cache entries
- c.cache.Delete(roleName)
- c.listCache.Clear() // Invalidate list cache
- glog.V(3).Infof("Deleted and invalidated cache for role %s", roleName)
- return nil
- }
- // ClearCache clears all cached entries (for testing or manual cache invalidation)
- func (c *CachedFilerRoleStore) ClearCache() {
- c.cache.Clear()
- c.listCache.Clear()
- glog.V(2).Infof("Cleared all role cache entries")
- }
- // GetCacheStats returns cache statistics
- func (c *CachedFilerRoleStore) GetCacheStats() map[string]interface{} {
- return map[string]interface{}{
- "roleCache": map[string]interface{}{
- "size": c.cache.ItemCount(),
- "ttl": c.ttl.String(),
- },
- "listCache": map[string]interface{}{
- "size": c.listCache.ItemCount(),
- "ttl": c.listTTL.String(),
- },
- }
- }
|