role_store.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. package integration
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/karlseguin/ccache/v2"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/iam/policy"
  12. "github.com/seaweedfs/seaweedfs/weed/pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. "google.golang.org/grpc"
  15. )
  16. // RoleStore defines the interface for storing IAM role definitions
  17. type RoleStore interface {
  18. // StoreRole stores a role definition (filerAddress ignored for memory stores)
  19. StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error
  20. // GetRole retrieves a role definition (filerAddress ignored for memory stores)
  21. GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error)
  22. // ListRoles lists all role names (filerAddress ignored for memory stores)
  23. ListRoles(ctx context.Context, filerAddress string) ([]string, error)
  24. // DeleteRole deletes a role definition (filerAddress ignored for memory stores)
  25. DeleteRole(ctx context.Context, filerAddress string, roleName string) error
  26. }
  27. // MemoryRoleStore implements RoleStore using in-memory storage
  28. type MemoryRoleStore struct {
  29. roles map[string]*RoleDefinition
  30. mutex sync.RWMutex
  31. }
  32. // NewMemoryRoleStore creates a new memory-based role store
  33. func NewMemoryRoleStore() *MemoryRoleStore {
  34. return &MemoryRoleStore{
  35. roles: make(map[string]*RoleDefinition),
  36. }
  37. }
  38. // StoreRole stores a role definition in memory (filerAddress ignored for memory store)
  39. func (m *MemoryRoleStore) StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error {
  40. if roleName == "" {
  41. return fmt.Errorf("role name cannot be empty")
  42. }
  43. if role == nil {
  44. return fmt.Errorf("role cannot be nil")
  45. }
  46. m.mutex.Lock()
  47. defer m.mutex.Unlock()
  48. // Deep copy the role to prevent external modifications
  49. m.roles[roleName] = copyRoleDefinition(role)
  50. return nil
  51. }
  52. // GetRole retrieves a role definition from memory (filerAddress ignored for memory store)
  53. func (m *MemoryRoleStore) GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error) {
  54. if roleName == "" {
  55. return nil, fmt.Errorf("role name cannot be empty")
  56. }
  57. m.mutex.RLock()
  58. defer m.mutex.RUnlock()
  59. role, exists := m.roles[roleName]
  60. if !exists {
  61. return nil, fmt.Errorf("role not found: %s", roleName)
  62. }
  63. // Return a copy to prevent external modifications
  64. return copyRoleDefinition(role), nil
  65. }
  66. // ListRoles lists all role names in memory (filerAddress ignored for memory store)
  67. func (m *MemoryRoleStore) ListRoles(ctx context.Context, filerAddress string) ([]string, error) {
  68. m.mutex.RLock()
  69. defer m.mutex.RUnlock()
  70. names := make([]string, 0, len(m.roles))
  71. for name := range m.roles {
  72. names = append(names, name)
  73. }
  74. return names, nil
  75. }
  76. // DeleteRole deletes a role definition from memory (filerAddress ignored for memory store)
  77. func (m *MemoryRoleStore) DeleteRole(ctx context.Context, filerAddress string, roleName string) error {
  78. if roleName == "" {
  79. return fmt.Errorf("role name cannot be empty")
  80. }
  81. m.mutex.Lock()
  82. defer m.mutex.Unlock()
  83. delete(m.roles, roleName)
  84. return nil
  85. }
  86. // copyRoleDefinition creates a deep copy of a role definition
  87. func copyRoleDefinition(original *RoleDefinition) *RoleDefinition {
  88. if original == nil {
  89. return nil
  90. }
  91. copied := &RoleDefinition{
  92. RoleName: original.RoleName,
  93. RoleArn: original.RoleArn,
  94. Description: original.Description,
  95. }
  96. // Deep copy trust policy if it exists
  97. if original.TrustPolicy != nil {
  98. // Use JSON marshaling for deep copy of the complex policy structure
  99. trustPolicyData, _ := json.Marshal(original.TrustPolicy)
  100. var trustPolicyCopy policy.PolicyDocument
  101. json.Unmarshal(trustPolicyData, &trustPolicyCopy)
  102. copied.TrustPolicy = &trustPolicyCopy
  103. }
  104. // Copy attached policies slice
  105. if original.AttachedPolicies != nil {
  106. copied.AttachedPolicies = make([]string, len(original.AttachedPolicies))
  107. copy(copied.AttachedPolicies, original.AttachedPolicies)
  108. }
  109. return copied
  110. }
  111. // FilerRoleStore implements RoleStore using SeaweedFS filer
  112. type FilerRoleStore struct {
  113. grpcDialOption grpc.DialOption
  114. basePath string
  115. filerAddressProvider func() string
  116. }
  117. // NewFilerRoleStore creates a new filer-based role store
  118. func NewFilerRoleStore(config map[string]interface{}, filerAddressProvider func() string) (*FilerRoleStore, error) {
  119. store := &FilerRoleStore{
  120. basePath: "/etc/iam/roles", // Default path for role storage - aligned with /etc/ convention
  121. filerAddressProvider: filerAddressProvider,
  122. }
  123. // Parse configuration - only basePath and other settings, NOT filerAddress
  124. if config != nil {
  125. if basePath, ok := config["basePath"].(string); ok && basePath != "" {
  126. store.basePath = strings.TrimSuffix(basePath, "/")
  127. }
  128. }
  129. glog.V(2).Infof("Initialized FilerRoleStore with basePath %s", store.basePath)
  130. return store, nil
  131. }
  132. // StoreRole stores a role definition in filer
  133. func (f *FilerRoleStore) StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error {
  134. // Use provider function if filerAddress is not provided
  135. if filerAddress == "" && f.filerAddressProvider != nil {
  136. filerAddress = f.filerAddressProvider()
  137. }
  138. if filerAddress == "" {
  139. return fmt.Errorf("filer address is required for FilerRoleStore")
  140. }
  141. if roleName == "" {
  142. return fmt.Errorf("role name cannot be empty")
  143. }
  144. if role == nil {
  145. return fmt.Errorf("role cannot be nil")
  146. }
  147. // Serialize role to JSON
  148. roleData, err := json.MarshalIndent(role, "", " ")
  149. if err != nil {
  150. return fmt.Errorf("failed to serialize role: %v", err)
  151. }
  152. rolePath := f.getRolePath(roleName)
  153. // Store in filer
  154. return f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
  155. request := &filer_pb.CreateEntryRequest{
  156. Directory: f.basePath,
  157. Entry: &filer_pb.Entry{
  158. Name: f.getRoleFileName(roleName),
  159. IsDirectory: false,
  160. Attributes: &filer_pb.FuseAttributes{
  161. Mtime: time.Now().Unix(),
  162. Crtime: time.Now().Unix(),
  163. FileMode: uint32(0600), // Read/write for owner only
  164. Uid: uint32(0),
  165. Gid: uint32(0),
  166. },
  167. Content: roleData,
  168. },
  169. }
  170. glog.V(3).Infof("Storing role %s at %s", roleName, rolePath)
  171. _, err := client.CreateEntry(ctx, request)
  172. if err != nil {
  173. return fmt.Errorf("failed to store role %s: %v", roleName, err)
  174. }
  175. return nil
  176. })
  177. }
  178. // GetRole retrieves a role definition from filer
  179. func (f *FilerRoleStore) GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error) {
  180. // Use provider function if filerAddress is not provided
  181. if filerAddress == "" && f.filerAddressProvider != nil {
  182. filerAddress = f.filerAddressProvider()
  183. }
  184. if filerAddress == "" {
  185. return nil, fmt.Errorf("filer address is required for FilerRoleStore")
  186. }
  187. if roleName == "" {
  188. return nil, fmt.Errorf("role name cannot be empty")
  189. }
  190. var roleData []byte
  191. err := f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
  192. request := &filer_pb.LookupDirectoryEntryRequest{
  193. Directory: f.basePath,
  194. Name: f.getRoleFileName(roleName),
  195. }
  196. glog.V(3).Infof("Looking up role %s", roleName)
  197. response, err := client.LookupDirectoryEntry(ctx, request)
  198. if err != nil {
  199. return fmt.Errorf("role not found: %v", err)
  200. }
  201. if response.Entry == nil {
  202. return fmt.Errorf("role not found")
  203. }
  204. roleData = response.Entry.Content
  205. return nil
  206. })
  207. if err != nil {
  208. return nil, err
  209. }
  210. // Deserialize role from JSON
  211. var role RoleDefinition
  212. if err := json.Unmarshal(roleData, &role); err != nil {
  213. return nil, fmt.Errorf("failed to deserialize role: %v", err)
  214. }
  215. return &role, nil
  216. }
  217. // ListRoles lists all role names in filer
  218. func (f *FilerRoleStore) ListRoles(ctx context.Context, filerAddress string) ([]string, error) {
  219. // Use provider function if filerAddress is not provided
  220. if filerAddress == "" && f.filerAddressProvider != nil {
  221. filerAddress = f.filerAddressProvider()
  222. }
  223. if filerAddress == "" {
  224. return nil, fmt.Errorf("filer address is required for FilerRoleStore")
  225. }
  226. var roleNames []string
  227. err := f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
  228. request := &filer_pb.ListEntriesRequest{
  229. Directory: f.basePath,
  230. Prefix: "",
  231. StartFromFileName: "",
  232. InclusiveStartFrom: false,
  233. Limit: 1000, // Process in batches of 1000
  234. }
  235. glog.V(3).Infof("Listing roles in %s", f.basePath)
  236. stream, err := client.ListEntries(ctx, request)
  237. if err != nil {
  238. return fmt.Errorf("failed to list roles: %v", err)
  239. }
  240. for {
  241. resp, err := stream.Recv()
  242. if err != nil {
  243. break // End of stream or error
  244. }
  245. if resp.Entry == nil || resp.Entry.IsDirectory {
  246. continue
  247. }
  248. // Extract role name from filename
  249. filename := resp.Entry.Name
  250. if strings.HasSuffix(filename, ".json") {
  251. roleName := strings.TrimSuffix(filename, ".json")
  252. roleNames = append(roleNames, roleName)
  253. }
  254. }
  255. return nil
  256. })
  257. if err != nil {
  258. return nil, err
  259. }
  260. return roleNames, nil
  261. }
  262. // DeleteRole deletes a role definition from filer
  263. func (f *FilerRoleStore) DeleteRole(ctx context.Context, filerAddress string, roleName string) error {
  264. // Use provider function if filerAddress is not provided
  265. if filerAddress == "" && f.filerAddressProvider != nil {
  266. filerAddress = f.filerAddressProvider()
  267. }
  268. if filerAddress == "" {
  269. return fmt.Errorf("filer address is required for FilerRoleStore")
  270. }
  271. if roleName == "" {
  272. return fmt.Errorf("role name cannot be empty")
  273. }
  274. return f.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
  275. request := &filer_pb.DeleteEntryRequest{
  276. Directory: f.basePath,
  277. Name: f.getRoleFileName(roleName),
  278. IsDeleteData: true,
  279. }
  280. glog.V(3).Infof("Deleting role %s", roleName)
  281. resp, err := client.DeleteEntry(ctx, request)
  282. if err != nil {
  283. if strings.Contains(err.Error(), "not found") {
  284. return nil // Idempotent: deletion of non-existent role is successful
  285. }
  286. return fmt.Errorf("failed to delete role %s: %v", roleName, err)
  287. }
  288. if resp.Error != "" {
  289. if strings.Contains(resp.Error, "not found") {
  290. return nil // Idempotent: deletion of non-existent role is successful
  291. }
  292. return fmt.Errorf("failed to delete role %s: %s", roleName, resp.Error)
  293. }
  294. return nil
  295. })
  296. }
  297. // Helper methods for FilerRoleStore
  298. func (f *FilerRoleStore) getRoleFileName(roleName string) string {
  299. return roleName + ".json"
  300. }
  301. func (f *FilerRoleStore) getRolePath(roleName string) string {
  302. return f.basePath + "/" + f.getRoleFileName(roleName)
  303. }
  304. func (f *FilerRoleStore) withFilerClient(filerAddress string, fn func(filer_pb.SeaweedFilerClient) error) error {
  305. if filerAddress == "" {
  306. return fmt.Errorf("filer address is required for FilerRoleStore")
  307. }
  308. return pb.WithGrpcFilerClient(false, 0, pb.ServerAddress(filerAddress), f.grpcDialOption, fn)
  309. }
  310. // CachedFilerRoleStore implements RoleStore with TTL caching on top of FilerRoleStore
  311. type CachedFilerRoleStore struct {
  312. filerStore *FilerRoleStore
  313. cache *ccache.Cache
  314. listCache *ccache.Cache
  315. ttl time.Duration
  316. listTTL time.Duration
  317. }
  318. // CachedFilerRoleStoreConfig holds configuration for the cached role store
  319. type CachedFilerRoleStoreConfig struct {
  320. BasePath string `json:"basePath,omitempty"`
  321. TTL string `json:"ttl,omitempty"` // e.g., "5m", "1h"
  322. ListTTL string `json:"listTtl,omitempty"` // e.g., "1m", "30s"
  323. MaxCacheSize int `json:"maxCacheSize,omitempty"` // Maximum number of cached roles
  324. }
  325. // NewCachedFilerRoleStore creates a new cached filer-based role store
  326. func NewCachedFilerRoleStore(config map[string]interface{}) (*CachedFilerRoleStore, error) {
  327. // Create underlying filer store
  328. filerStore, err := NewFilerRoleStore(config, nil)
  329. if err != nil {
  330. return nil, fmt.Errorf("failed to create filer role store: %w", err)
  331. }
  332. // Parse cache configuration with defaults
  333. cacheTTL := 5 * time.Minute // Default 5 minutes for role cache
  334. listTTL := 1 * time.Minute // Default 1 minute for list cache
  335. maxCacheSize := 1000 // Default max 1000 cached roles
  336. if config != nil {
  337. if ttlStr, ok := config["ttl"].(string); ok && ttlStr != "" {
  338. if parsed, err := time.ParseDuration(ttlStr); err == nil {
  339. cacheTTL = parsed
  340. }
  341. }
  342. if listTTLStr, ok := config["listTtl"].(string); ok && listTTLStr != "" {
  343. if parsed, err := time.ParseDuration(listTTLStr); err == nil {
  344. listTTL = parsed
  345. }
  346. }
  347. if maxSize, ok := config["maxCacheSize"].(int); ok && maxSize > 0 {
  348. maxCacheSize = maxSize
  349. }
  350. }
  351. // Create ccache instances with appropriate configurations
  352. pruneCount := int64(maxCacheSize) >> 3
  353. if pruneCount <= 0 {
  354. pruneCount = 100
  355. }
  356. store := &CachedFilerRoleStore{
  357. filerStore: filerStore,
  358. cache: ccache.New(ccache.Configure().MaxSize(int64(maxCacheSize)).ItemsToPrune(uint32(pruneCount))),
  359. listCache: ccache.New(ccache.Configure().MaxSize(100).ItemsToPrune(10)), // Smaller cache for lists
  360. ttl: cacheTTL,
  361. listTTL: listTTL,
  362. }
  363. glog.V(2).Infof("Initialized CachedFilerRoleStore with TTL %v, List TTL %v, Max Cache Size %d",
  364. cacheTTL, listTTL, maxCacheSize)
  365. return store, nil
  366. }
  367. // StoreRole stores a role definition and invalidates the cache
  368. func (c *CachedFilerRoleStore) StoreRole(ctx context.Context, filerAddress string, roleName string, role *RoleDefinition) error {
  369. // Store in filer
  370. err := c.filerStore.StoreRole(ctx, filerAddress, roleName, role)
  371. if err != nil {
  372. return err
  373. }
  374. // Invalidate cache entries
  375. c.cache.Delete(roleName)
  376. c.listCache.Clear() // Invalidate list cache
  377. glog.V(3).Infof("Stored and invalidated cache for role %s", roleName)
  378. return nil
  379. }
  380. // GetRole retrieves a role definition with caching
  381. func (c *CachedFilerRoleStore) GetRole(ctx context.Context, filerAddress string, roleName string) (*RoleDefinition, error) {
  382. // Try to get from cache first
  383. item := c.cache.Get(roleName)
  384. if item != nil {
  385. // Cache hit - return cached role (DO NOT extend TTL)
  386. role := item.Value().(*RoleDefinition)
  387. glog.V(4).Infof("Cache hit for role %s", roleName)
  388. return copyRoleDefinition(role), nil
  389. }
  390. // Cache miss - fetch from filer
  391. glog.V(4).Infof("Cache miss for role %s, fetching from filer", roleName)
  392. role, err := c.filerStore.GetRole(ctx, filerAddress, roleName)
  393. if err != nil {
  394. return nil, err
  395. }
  396. // Cache the result with TTL
  397. c.cache.Set(roleName, copyRoleDefinition(role), c.ttl)
  398. glog.V(3).Infof("Cached role %s with TTL %v", roleName, c.ttl)
  399. return role, nil
  400. }
  401. // ListRoles lists all role names with caching
  402. func (c *CachedFilerRoleStore) ListRoles(ctx context.Context, filerAddress string) ([]string, error) {
  403. // Use a constant key for the role list cache
  404. const listCacheKey = "role_list"
  405. // Try to get from list cache first
  406. item := c.listCache.Get(listCacheKey)
  407. if item != nil {
  408. // Cache hit - return cached list (DO NOT extend TTL)
  409. roles := item.Value().([]string)
  410. glog.V(4).Infof("List cache hit, returning %d roles", len(roles))
  411. return append([]string(nil), roles...), nil // Return a copy
  412. }
  413. // Cache miss - fetch from filer
  414. glog.V(4).Infof("List cache miss, fetching from filer")
  415. roles, err := c.filerStore.ListRoles(ctx, filerAddress)
  416. if err != nil {
  417. return nil, err
  418. }
  419. // Cache the result with TTL (store a copy)
  420. rolesCopy := append([]string(nil), roles...)
  421. c.listCache.Set(listCacheKey, rolesCopy, c.listTTL)
  422. glog.V(3).Infof("Cached role list with %d entries, TTL %v", len(roles), c.listTTL)
  423. return roles, nil
  424. }
  425. // DeleteRole deletes a role definition and invalidates the cache
  426. func (c *CachedFilerRoleStore) DeleteRole(ctx context.Context, filerAddress string, roleName string) error {
  427. // Delete from filer
  428. err := c.filerStore.DeleteRole(ctx, filerAddress, roleName)
  429. if err != nil {
  430. return err
  431. }
  432. // Invalidate cache entries
  433. c.cache.Delete(roleName)
  434. c.listCache.Clear() // Invalidate list cache
  435. glog.V(3).Infof("Deleted and invalidated cache for role %s", roleName)
  436. return nil
  437. }
  438. // ClearCache clears all cached entries (for testing or manual cache invalidation)
  439. func (c *CachedFilerRoleStore) ClearCache() {
  440. c.cache.Clear()
  441. c.listCache.Clear()
  442. glog.V(2).Infof("Cleared all role cache entries")
  443. }
  444. // GetCacheStats returns cache statistics
  445. func (c *CachedFilerRoleStore) GetCacheStats() map[string]interface{} {
  446. return map[string]interface{}{
  447. "roleCache": map[string]interface{}{
  448. "size": c.cache.ItemCount(),
  449. "ttl": c.ttl.String(),
  450. },
  451. "listCache": map[string]interface{}{
  452. "size": c.listCache.ItemCount(),
  453. "ttl": c.listTTL.String(),
  454. },
  455. }
  456. }