postgres_identity.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. package postgres
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/seaweedfs/seaweedfs/weed/credential"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
  9. )
  10. func (store *PostgresStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
  11. if !store.configured {
  12. return nil, fmt.Errorf("store not configured")
  13. }
  14. config := &iam_pb.S3ApiConfiguration{}
  15. // Query all users
  16. rows, err := store.db.QueryContext(ctx, "SELECT username, email, account_data, actions FROM users")
  17. if err != nil {
  18. return nil, fmt.Errorf("failed to query users: %w", err)
  19. }
  20. defer rows.Close()
  21. for rows.Next() {
  22. var username, email string
  23. var accountDataJSON, actionsJSON []byte
  24. if err := rows.Scan(&username, &email, &accountDataJSON, &actionsJSON); err != nil {
  25. return nil, fmt.Errorf("failed to scan user row: %w", err)
  26. }
  27. identity := &iam_pb.Identity{
  28. Name: username,
  29. }
  30. // Parse account data
  31. if len(accountDataJSON) > 0 {
  32. if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil {
  33. return nil, fmt.Errorf("failed to unmarshal account data for user %s: %v", username, err)
  34. }
  35. }
  36. // Parse actions
  37. if len(actionsJSON) > 0 {
  38. if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil {
  39. return nil, fmt.Errorf("failed to unmarshal actions for user %s: %v", username, err)
  40. }
  41. }
  42. // Query credentials for this user
  43. credRows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username)
  44. if err != nil {
  45. return nil, fmt.Errorf("failed to query credentials for user %s: %v", username, err)
  46. }
  47. for credRows.Next() {
  48. var accessKey, secretKey string
  49. if err := credRows.Scan(&accessKey, &secretKey); err != nil {
  50. credRows.Close()
  51. return nil, fmt.Errorf("failed to scan credential row for user %s: %v", username, err)
  52. }
  53. identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
  54. AccessKey: accessKey,
  55. SecretKey: secretKey,
  56. })
  57. }
  58. credRows.Close()
  59. config.Identities = append(config.Identities, identity)
  60. }
  61. return config, nil
  62. }
  63. func (store *PostgresStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
  64. if !store.configured {
  65. return fmt.Errorf("store not configured")
  66. }
  67. // Start transaction
  68. tx, err := store.db.BeginTx(ctx, nil)
  69. if err != nil {
  70. return fmt.Errorf("failed to begin transaction: %w", err)
  71. }
  72. defer tx.Rollback()
  73. // Clear existing data
  74. if _, err := tx.ExecContext(ctx, "DELETE FROM credentials"); err != nil {
  75. return fmt.Errorf("failed to clear credentials: %w", err)
  76. }
  77. if _, err := tx.ExecContext(ctx, "DELETE FROM users"); err != nil {
  78. return fmt.Errorf("failed to clear users: %w", err)
  79. }
  80. // Insert all identities
  81. for _, identity := range config.Identities {
  82. // Marshal account data
  83. var accountDataJSON []byte
  84. if identity.Account != nil {
  85. accountDataJSON, err = json.Marshal(identity.Account)
  86. if err != nil {
  87. return fmt.Errorf("failed to marshal account data for user %s: %v", identity.Name, err)
  88. }
  89. }
  90. // Marshal actions
  91. var actionsJSON []byte
  92. if identity.Actions != nil {
  93. actionsJSON, err = json.Marshal(identity.Actions)
  94. if err != nil {
  95. return fmt.Errorf("failed to marshal actions for user %s: %v", identity.Name, err)
  96. }
  97. }
  98. // Insert user
  99. _, err := tx.ExecContext(ctx,
  100. "INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)",
  101. identity.Name, "", accountDataJSON, actionsJSON)
  102. if err != nil {
  103. return fmt.Errorf("failed to insert user %s: %v", identity.Name, err)
  104. }
  105. // Insert credentials
  106. for _, cred := range identity.Credentials {
  107. _, err := tx.ExecContext(ctx,
  108. "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
  109. identity.Name, cred.AccessKey, cred.SecretKey)
  110. if err != nil {
  111. return fmt.Errorf("failed to insert credential for user %s: %v", identity.Name, err)
  112. }
  113. }
  114. }
  115. return tx.Commit()
  116. }
  117. func (store *PostgresStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
  118. if !store.configured {
  119. return fmt.Errorf("store not configured")
  120. }
  121. // Check if user already exists
  122. var count int
  123. err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", identity.Name).Scan(&count)
  124. if err != nil {
  125. return fmt.Errorf("failed to check user existence: %w", err)
  126. }
  127. if count > 0 {
  128. return credential.ErrUserAlreadyExists
  129. }
  130. // Start transaction
  131. tx, err := store.db.BeginTx(ctx, nil)
  132. if err != nil {
  133. return fmt.Errorf("failed to begin transaction: %w", err)
  134. }
  135. defer tx.Rollback()
  136. // Marshal account data
  137. var accountDataJSON []byte
  138. if identity.Account != nil {
  139. accountDataJSON, err = json.Marshal(identity.Account)
  140. if err != nil {
  141. return fmt.Errorf("failed to marshal account data: %w", err)
  142. }
  143. }
  144. // Marshal actions
  145. var actionsJSON []byte
  146. if identity.Actions != nil {
  147. actionsJSON, err = json.Marshal(identity.Actions)
  148. if err != nil {
  149. return fmt.Errorf("failed to marshal actions: %w", err)
  150. }
  151. }
  152. // Insert user
  153. _, err = tx.ExecContext(ctx,
  154. "INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)",
  155. identity.Name, "", accountDataJSON, actionsJSON)
  156. if err != nil {
  157. return fmt.Errorf("failed to insert user: %w", err)
  158. }
  159. // Insert credentials
  160. for _, cred := range identity.Credentials {
  161. _, err = tx.ExecContext(ctx,
  162. "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
  163. identity.Name, cred.AccessKey, cred.SecretKey)
  164. if err != nil {
  165. return fmt.Errorf("failed to insert credential: %w", err)
  166. }
  167. }
  168. return tx.Commit()
  169. }
  170. func (store *PostgresStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
  171. if !store.configured {
  172. return nil, fmt.Errorf("store not configured")
  173. }
  174. var email string
  175. var accountDataJSON, actionsJSON []byte
  176. err := store.db.QueryRowContext(ctx,
  177. "SELECT email, account_data, actions FROM users WHERE username = $1",
  178. username).Scan(&email, &accountDataJSON, &actionsJSON)
  179. if err != nil {
  180. if err == sql.ErrNoRows {
  181. return nil, credential.ErrUserNotFound
  182. }
  183. return nil, fmt.Errorf("failed to query user: %w", err)
  184. }
  185. identity := &iam_pb.Identity{
  186. Name: username,
  187. }
  188. // Parse account data
  189. if len(accountDataJSON) > 0 {
  190. if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil {
  191. return nil, fmt.Errorf("failed to unmarshal account data: %w", err)
  192. }
  193. }
  194. // Parse actions
  195. if len(actionsJSON) > 0 {
  196. if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil {
  197. return nil, fmt.Errorf("failed to unmarshal actions: %w", err)
  198. }
  199. }
  200. // Query credentials
  201. rows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username)
  202. if err != nil {
  203. return nil, fmt.Errorf("failed to query credentials: %w", err)
  204. }
  205. defer rows.Close()
  206. for rows.Next() {
  207. var accessKey, secretKey string
  208. if err := rows.Scan(&accessKey, &secretKey); err != nil {
  209. return nil, fmt.Errorf("failed to scan credential: %w", err)
  210. }
  211. identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
  212. AccessKey: accessKey,
  213. SecretKey: secretKey,
  214. })
  215. }
  216. return identity, nil
  217. }
  218. func (store *PostgresStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error {
  219. if !store.configured {
  220. return fmt.Errorf("store not configured")
  221. }
  222. // Start transaction
  223. tx, err := store.db.BeginTx(ctx, nil)
  224. if err != nil {
  225. return fmt.Errorf("failed to begin transaction: %w", err)
  226. }
  227. defer tx.Rollback()
  228. // Check if user exists
  229. var count int
  230. err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
  231. if err != nil {
  232. return fmt.Errorf("failed to check user existence: %w", err)
  233. }
  234. if count == 0 {
  235. return credential.ErrUserNotFound
  236. }
  237. // Marshal account data
  238. var accountDataJSON []byte
  239. if identity.Account != nil {
  240. accountDataJSON, err = json.Marshal(identity.Account)
  241. if err != nil {
  242. return fmt.Errorf("failed to marshal account data: %w", err)
  243. }
  244. }
  245. // Marshal actions
  246. var actionsJSON []byte
  247. if identity.Actions != nil {
  248. actionsJSON, err = json.Marshal(identity.Actions)
  249. if err != nil {
  250. return fmt.Errorf("failed to marshal actions: %w", err)
  251. }
  252. }
  253. // Update user
  254. _, err = tx.ExecContext(ctx,
  255. "UPDATE users SET email = $2, account_data = $3, actions = $4, updated_at = CURRENT_TIMESTAMP WHERE username = $1",
  256. username, "", accountDataJSON, actionsJSON)
  257. if err != nil {
  258. return fmt.Errorf("failed to update user: %w", err)
  259. }
  260. // Delete existing credentials
  261. _, err = tx.ExecContext(ctx, "DELETE FROM credentials WHERE username = $1", username)
  262. if err != nil {
  263. return fmt.Errorf("failed to delete existing credentials: %w", err)
  264. }
  265. // Insert new credentials
  266. for _, cred := range identity.Credentials {
  267. _, err = tx.ExecContext(ctx,
  268. "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
  269. username, cred.AccessKey, cred.SecretKey)
  270. if err != nil {
  271. return fmt.Errorf("failed to insert credential: %w", err)
  272. }
  273. }
  274. return tx.Commit()
  275. }
  276. func (store *PostgresStore) DeleteUser(ctx context.Context, username string) error {
  277. if !store.configured {
  278. return fmt.Errorf("store not configured")
  279. }
  280. result, err := store.db.ExecContext(ctx, "DELETE FROM users WHERE username = $1", username)
  281. if err != nil {
  282. return fmt.Errorf("failed to delete user: %w", err)
  283. }
  284. rowsAffected, err := result.RowsAffected()
  285. if err != nil {
  286. return fmt.Errorf("failed to get rows affected: %w", err)
  287. }
  288. if rowsAffected == 0 {
  289. return credential.ErrUserNotFound
  290. }
  291. return nil
  292. }
  293. func (store *PostgresStore) ListUsers(ctx context.Context) ([]string, error) {
  294. if !store.configured {
  295. return nil, fmt.Errorf("store not configured")
  296. }
  297. rows, err := store.db.QueryContext(ctx, "SELECT username FROM users ORDER BY username")
  298. if err != nil {
  299. return nil, fmt.Errorf("failed to query users: %w", err)
  300. }
  301. defer rows.Close()
  302. var usernames []string
  303. for rows.Next() {
  304. var username string
  305. if err := rows.Scan(&username); err != nil {
  306. return nil, fmt.Errorf("failed to scan username: %w", err)
  307. }
  308. usernames = append(usernames, username)
  309. }
  310. return usernames, nil
  311. }
  312. func (store *PostgresStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) {
  313. if !store.configured {
  314. return nil, fmt.Errorf("store not configured")
  315. }
  316. var username string
  317. err := store.db.QueryRowContext(ctx, "SELECT username FROM credentials WHERE access_key = $1", accessKey).Scan(&username)
  318. if err != nil {
  319. if err == sql.ErrNoRows {
  320. return nil, credential.ErrAccessKeyNotFound
  321. }
  322. return nil, fmt.Errorf("failed to query access key: %w", err)
  323. }
  324. return store.GetUser(ctx, username)
  325. }
  326. func (store *PostgresStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error {
  327. if !store.configured {
  328. return fmt.Errorf("store not configured")
  329. }
  330. // Check if user exists
  331. var count int
  332. err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
  333. if err != nil {
  334. return fmt.Errorf("failed to check user existence: %w", err)
  335. }
  336. if count == 0 {
  337. return credential.ErrUserNotFound
  338. }
  339. // Insert credential
  340. _, err = store.db.ExecContext(ctx,
  341. "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
  342. username, cred.AccessKey, cred.SecretKey)
  343. if err != nil {
  344. return fmt.Errorf("failed to insert credential: %w", err)
  345. }
  346. return nil
  347. }
  348. func (store *PostgresStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error {
  349. if !store.configured {
  350. return fmt.Errorf("store not configured")
  351. }
  352. result, err := store.db.ExecContext(ctx,
  353. "DELETE FROM credentials WHERE username = $1 AND access_key = $2",
  354. username, accessKey)
  355. if err != nil {
  356. return fmt.Errorf("failed to delete access key: %w", err)
  357. }
  358. rowsAffected, err := result.RowsAffected()
  359. if err != nil {
  360. return fmt.Errorf("failed to get rows affected: %w", err)
  361. }
  362. if rowsAffected == 0 {
  363. // Check if user exists
  364. var count int
  365. err = store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
  366. if err != nil {
  367. return fmt.Errorf("failed to check user existence: %w", err)
  368. }
  369. if count == 0 {
  370. return credential.ErrUserNotFound
  371. }
  372. return credential.ErrAccessKeyNotFound
  373. }
  374. return nil
  375. }