policy_store.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. package policy
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  12. "google.golang.org/grpc"
  13. )
  14. // MemoryPolicyStore implements PolicyStore using in-memory storage
  15. type MemoryPolicyStore struct {
  16. policies map[string]*PolicyDocument
  17. mutex sync.RWMutex
  18. }
  19. // NewMemoryPolicyStore creates a new memory-based policy store
  20. func NewMemoryPolicyStore() *MemoryPolicyStore {
  21. return &MemoryPolicyStore{
  22. policies: make(map[string]*PolicyDocument),
  23. }
  24. }
  25. // StorePolicy stores a policy document in memory (filerAddress ignored for memory store)
  26. func (s *MemoryPolicyStore) StorePolicy(ctx context.Context, filerAddress string, name string, policy *PolicyDocument) error {
  27. if name == "" {
  28. return fmt.Errorf("policy name cannot be empty")
  29. }
  30. if policy == nil {
  31. return fmt.Errorf("policy cannot be nil")
  32. }
  33. s.mutex.Lock()
  34. defer s.mutex.Unlock()
  35. // Deep copy the policy to prevent external modifications
  36. s.policies[name] = copyPolicyDocument(policy)
  37. return nil
  38. }
  39. // GetPolicy retrieves a policy document from memory (filerAddress ignored for memory store)
  40. func (s *MemoryPolicyStore) GetPolicy(ctx context.Context, filerAddress string, name string) (*PolicyDocument, error) {
  41. if name == "" {
  42. return nil, fmt.Errorf("policy name cannot be empty")
  43. }
  44. s.mutex.RLock()
  45. defer s.mutex.RUnlock()
  46. policy, exists := s.policies[name]
  47. if !exists {
  48. return nil, fmt.Errorf("policy not found: %s", name)
  49. }
  50. // Return a copy to prevent external modifications
  51. return copyPolicyDocument(policy), nil
  52. }
  53. // DeletePolicy deletes a policy document from memory (filerAddress ignored for memory store)
  54. func (s *MemoryPolicyStore) DeletePolicy(ctx context.Context, filerAddress string, name string) error {
  55. if name == "" {
  56. return fmt.Errorf("policy name cannot be empty")
  57. }
  58. s.mutex.Lock()
  59. defer s.mutex.Unlock()
  60. delete(s.policies, name)
  61. return nil
  62. }
  63. // ListPolicies lists all policy names in memory (filerAddress ignored for memory store)
  64. func (s *MemoryPolicyStore) ListPolicies(ctx context.Context, filerAddress string) ([]string, error) {
  65. s.mutex.RLock()
  66. defer s.mutex.RUnlock()
  67. names := make([]string, 0, len(s.policies))
  68. for name := range s.policies {
  69. names = append(names, name)
  70. }
  71. return names, nil
  72. }
  73. // copyPolicyDocument creates a deep copy of a policy document
  74. func copyPolicyDocument(original *PolicyDocument) *PolicyDocument {
  75. if original == nil {
  76. return nil
  77. }
  78. copied := &PolicyDocument{
  79. Version: original.Version,
  80. Id: original.Id,
  81. }
  82. // Copy statements
  83. copied.Statement = make([]Statement, len(original.Statement))
  84. for i, stmt := range original.Statement {
  85. copied.Statement[i] = Statement{
  86. Sid: stmt.Sid,
  87. Effect: stmt.Effect,
  88. Principal: stmt.Principal,
  89. NotPrincipal: stmt.NotPrincipal,
  90. }
  91. // Copy action slice
  92. if stmt.Action != nil {
  93. copied.Statement[i].Action = make([]string, len(stmt.Action))
  94. copy(copied.Statement[i].Action, stmt.Action)
  95. }
  96. // Copy NotAction slice
  97. if stmt.NotAction != nil {
  98. copied.Statement[i].NotAction = make([]string, len(stmt.NotAction))
  99. copy(copied.Statement[i].NotAction, stmt.NotAction)
  100. }
  101. // Copy resource slice
  102. if stmt.Resource != nil {
  103. copied.Statement[i].Resource = make([]string, len(stmt.Resource))
  104. copy(copied.Statement[i].Resource, stmt.Resource)
  105. }
  106. // Copy NotResource slice
  107. if stmt.NotResource != nil {
  108. copied.Statement[i].NotResource = make([]string, len(stmt.NotResource))
  109. copy(copied.Statement[i].NotResource, stmt.NotResource)
  110. }
  111. // Copy condition map (shallow copy for now)
  112. if stmt.Condition != nil {
  113. copied.Statement[i].Condition = make(map[string]map[string]interface{})
  114. for k, v := range stmt.Condition {
  115. copied.Statement[i].Condition[k] = v
  116. }
  117. }
  118. }
  119. return copied
  120. }
  121. // FilerPolicyStore implements PolicyStore using SeaweedFS filer
  122. type FilerPolicyStore struct {
  123. grpcDialOption grpc.DialOption
  124. basePath string
  125. filerAddressProvider func() string
  126. }
  127. // NewFilerPolicyStore creates a new filer-based policy store
  128. func NewFilerPolicyStore(config map[string]interface{}, filerAddressProvider func() string) (*FilerPolicyStore, error) {
  129. store := &FilerPolicyStore{
  130. basePath: "/etc/iam/policies", // Default path for policy storage - aligned with /etc/ convention
  131. filerAddressProvider: filerAddressProvider,
  132. }
  133. // Parse configuration - only basePath and other settings, NOT filerAddress
  134. if config != nil {
  135. if basePath, ok := config["basePath"].(string); ok && basePath != "" {
  136. store.basePath = strings.TrimSuffix(basePath, "/")
  137. }
  138. }
  139. glog.V(2).Infof("Initialized FilerPolicyStore with basePath %s", store.basePath)
  140. return store, nil
  141. }
  142. // StorePolicy stores a policy document in filer
  143. func (s *FilerPolicyStore) StorePolicy(ctx context.Context, filerAddress string, name string, policy *PolicyDocument) error {
  144. // Use provider function if filerAddress is not provided
  145. if filerAddress == "" && s.filerAddressProvider != nil {
  146. filerAddress = s.filerAddressProvider()
  147. }
  148. if filerAddress == "" {
  149. return fmt.Errorf("filer address is required for FilerPolicyStore")
  150. }
  151. if name == "" {
  152. return fmt.Errorf("policy name cannot be empty")
  153. }
  154. if policy == nil {
  155. return fmt.Errorf("policy cannot be nil")
  156. }
  157. // Serialize policy to JSON
  158. policyData, err := json.MarshalIndent(policy, "", " ")
  159. if err != nil {
  160. return fmt.Errorf("failed to serialize policy: %v", err)
  161. }
  162. policyPath := s.getPolicyPath(name)
  163. // Store in filer
  164. return s.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
  165. request := &filer_pb.CreateEntryRequest{
  166. Directory: s.basePath,
  167. Entry: &filer_pb.Entry{
  168. Name: s.getPolicyFileName(name),
  169. IsDirectory: false,
  170. Attributes: &filer_pb.FuseAttributes{
  171. Mtime: time.Now().Unix(),
  172. Crtime: time.Now().Unix(),
  173. FileMode: uint32(0600), // Read/write for owner only
  174. Uid: uint32(0),
  175. Gid: uint32(0),
  176. },
  177. Content: policyData,
  178. },
  179. }
  180. glog.V(3).Infof("Storing policy %s at %s", name, policyPath)
  181. _, err := client.CreateEntry(ctx, request)
  182. if err != nil {
  183. return fmt.Errorf("failed to store policy %s: %v", name, err)
  184. }
  185. return nil
  186. })
  187. }
  188. // GetPolicy retrieves a policy document from filer
  189. func (s *FilerPolicyStore) GetPolicy(ctx context.Context, filerAddress string, name string) (*PolicyDocument, error) {
  190. // Use provider function if filerAddress is not provided
  191. if filerAddress == "" && s.filerAddressProvider != nil {
  192. filerAddress = s.filerAddressProvider()
  193. }
  194. if filerAddress == "" {
  195. return nil, fmt.Errorf("filer address is required for FilerPolicyStore")
  196. }
  197. if name == "" {
  198. return nil, fmt.Errorf("policy name cannot be empty")
  199. }
  200. var policyData []byte
  201. err := s.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
  202. request := &filer_pb.LookupDirectoryEntryRequest{
  203. Directory: s.basePath,
  204. Name: s.getPolicyFileName(name),
  205. }
  206. glog.V(3).Infof("Looking up policy %s", name)
  207. response, err := client.LookupDirectoryEntry(ctx, request)
  208. if err != nil {
  209. return fmt.Errorf("policy not found: %v", err)
  210. }
  211. if response.Entry == nil {
  212. return fmt.Errorf("policy not found")
  213. }
  214. policyData = response.Entry.Content
  215. return nil
  216. })
  217. if err != nil {
  218. return nil, err
  219. }
  220. // Deserialize policy from JSON
  221. var policy PolicyDocument
  222. if err := json.Unmarshal(policyData, &policy); err != nil {
  223. return nil, fmt.Errorf("failed to deserialize policy: %v", err)
  224. }
  225. return &policy, nil
  226. }
  227. // DeletePolicy deletes a policy document from filer
  228. func (s *FilerPolicyStore) DeletePolicy(ctx context.Context, filerAddress string, name string) error {
  229. // Use provider function if filerAddress is not provided
  230. if filerAddress == "" && s.filerAddressProvider != nil {
  231. filerAddress = s.filerAddressProvider()
  232. }
  233. if filerAddress == "" {
  234. return fmt.Errorf("filer address is required for FilerPolicyStore")
  235. }
  236. if name == "" {
  237. return fmt.Errorf("policy name cannot be empty")
  238. }
  239. return s.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
  240. request := &filer_pb.DeleteEntryRequest{
  241. Directory: s.basePath,
  242. Name: s.getPolicyFileName(name),
  243. IsDeleteData: true,
  244. IsRecursive: false,
  245. IgnoreRecursiveError: false,
  246. }
  247. glog.V(3).Infof("Deleting policy %s", name)
  248. resp, err := client.DeleteEntry(ctx, request)
  249. if err != nil {
  250. // Ignore "not found" errors - policy may already be deleted
  251. if strings.Contains(err.Error(), "not found") {
  252. return nil
  253. }
  254. return fmt.Errorf("failed to delete policy %s: %v", name, err)
  255. }
  256. // Check response error
  257. if resp.Error != "" {
  258. // Ignore "not found" errors - policy may already be deleted
  259. if strings.Contains(resp.Error, "not found") {
  260. return nil
  261. }
  262. return fmt.Errorf("failed to delete policy %s: %s", name, resp.Error)
  263. }
  264. return nil
  265. })
  266. }
  267. // ListPolicies lists all policy names in filer
  268. func (s *FilerPolicyStore) ListPolicies(ctx context.Context, filerAddress string) ([]string, error) {
  269. // Use provider function if filerAddress is not provided
  270. if filerAddress == "" && s.filerAddressProvider != nil {
  271. filerAddress = s.filerAddressProvider()
  272. }
  273. if filerAddress == "" {
  274. return nil, fmt.Errorf("filer address is required for FilerPolicyStore")
  275. }
  276. var policyNames []string
  277. err := s.withFilerClient(filerAddress, func(client filer_pb.SeaweedFilerClient) error {
  278. // List all entries in the policy directory
  279. request := &filer_pb.ListEntriesRequest{
  280. Directory: s.basePath,
  281. Prefix: "policy_",
  282. StartFromFileName: "",
  283. InclusiveStartFrom: false,
  284. Limit: 1000, // Process in batches of 1000
  285. }
  286. stream, err := client.ListEntries(ctx, request)
  287. if err != nil {
  288. return fmt.Errorf("failed to list policies: %v", err)
  289. }
  290. for {
  291. resp, err := stream.Recv()
  292. if err != nil {
  293. break // End of stream or error
  294. }
  295. if resp.Entry == nil || resp.Entry.IsDirectory {
  296. continue
  297. }
  298. // Extract policy name from filename
  299. filename := resp.Entry.Name
  300. if strings.HasPrefix(filename, "policy_") && strings.HasSuffix(filename, ".json") {
  301. // Remove "policy_" prefix and ".json" suffix
  302. policyName := strings.TrimSuffix(strings.TrimPrefix(filename, "policy_"), ".json")
  303. policyNames = append(policyNames, policyName)
  304. }
  305. }
  306. return nil
  307. })
  308. if err != nil {
  309. return nil, err
  310. }
  311. return policyNames, nil
  312. }
  313. // Helper methods
  314. // withFilerClient executes a function with a filer client
  315. func (s *FilerPolicyStore) withFilerClient(filerAddress string, fn func(client filer_pb.SeaweedFilerClient) error) error {
  316. if filerAddress == "" {
  317. return fmt.Errorf("filer address is required for FilerPolicyStore")
  318. }
  319. // Use the pb.WithGrpcFilerClient helper similar to existing SeaweedFS code
  320. return pb.WithGrpcFilerClient(false, 0, pb.ServerAddress(filerAddress), s.grpcDialOption, fn)
  321. }
  322. // getPolicyPath returns the full path for a policy
  323. func (s *FilerPolicyStore) getPolicyPath(policyName string) string {
  324. return s.basePath + "/" + s.getPolicyFileName(policyName)
  325. }
  326. // getPolicyFileName returns the filename for a policy
  327. func (s *FilerPolicyStore) getPolicyFileName(policyName string) string {
  328. return "policy_" + policyName + ".json"
  329. }