| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- package postgres
- import (
- "context"
- "database/sql"
- "encoding/json"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/credential"
- "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
- )
- func (store *PostgresStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) {
- if !store.configured {
- return nil, fmt.Errorf("store not configured")
- }
- config := &iam_pb.S3ApiConfiguration{}
- // Query all users
- rows, err := store.db.QueryContext(ctx, "SELECT username, email, account_data, actions FROM users")
- if err != nil {
- return nil, fmt.Errorf("failed to query users: %w", err)
- }
- defer rows.Close()
- for rows.Next() {
- var username, email string
- var accountDataJSON, actionsJSON []byte
- if err := rows.Scan(&username, &email, &accountDataJSON, &actionsJSON); err != nil {
- return nil, fmt.Errorf("failed to scan user row: %w", err)
- }
- identity := &iam_pb.Identity{
- Name: username,
- }
- // Parse account data
- if len(accountDataJSON) > 0 {
- if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil {
- return nil, fmt.Errorf("failed to unmarshal account data for user %s: %v", username, err)
- }
- }
- // Parse actions
- if len(actionsJSON) > 0 {
- if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil {
- return nil, fmt.Errorf("failed to unmarshal actions for user %s: %v", username, err)
- }
- }
- // Query credentials for this user
- credRows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username)
- if err != nil {
- return nil, fmt.Errorf("failed to query credentials for user %s: %v", username, err)
- }
- for credRows.Next() {
- var accessKey, secretKey string
- if err := credRows.Scan(&accessKey, &secretKey); err != nil {
- credRows.Close()
- return nil, fmt.Errorf("failed to scan credential row for user %s: %v", username, err)
- }
- identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
- AccessKey: accessKey,
- SecretKey: secretKey,
- })
- }
- credRows.Close()
- config.Identities = append(config.Identities, identity)
- }
- return config, nil
- }
- func (store *PostgresStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error {
- if !store.configured {
- return fmt.Errorf("store not configured")
- }
- // Start transaction
- tx, err := store.db.BeginTx(ctx, nil)
- if err != nil {
- return fmt.Errorf("failed to begin transaction: %w", err)
- }
- defer tx.Rollback()
- // Clear existing data
- if _, err := tx.ExecContext(ctx, "DELETE FROM credentials"); err != nil {
- return fmt.Errorf("failed to clear credentials: %w", err)
- }
- if _, err := tx.ExecContext(ctx, "DELETE FROM users"); err != nil {
- return fmt.Errorf("failed to clear users: %w", err)
- }
- // Insert all identities
- for _, identity := range config.Identities {
- // Marshal account data
- var accountDataJSON []byte
- if identity.Account != nil {
- accountDataJSON, err = json.Marshal(identity.Account)
- if err != nil {
- return fmt.Errorf("failed to marshal account data for user %s: %v", identity.Name, err)
- }
- }
- // Marshal actions
- var actionsJSON []byte
- if identity.Actions != nil {
- actionsJSON, err = json.Marshal(identity.Actions)
- if err != nil {
- return fmt.Errorf("failed to marshal actions for user %s: %v", identity.Name, err)
- }
- }
- // Insert user
- _, err := tx.ExecContext(ctx,
- "INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)",
- identity.Name, "", accountDataJSON, actionsJSON)
- if err != nil {
- return fmt.Errorf("failed to insert user %s: %v", identity.Name, err)
- }
- // Insert credentials
- for _, cred := range identity.Credentials {
- _, err := tx.ExecContext(ctx,
- "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
- identity.Name, cred.AccessKey, cred.SecretKey)
- if err != nil {
- return fmt.Errorf("failed to insert credential for user %s: %v", identity.Name, err)
- }
- }
- }
- return tx.Commit()
- }
- func (store *PostgresStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error {
- if !store.configured {
- return fmt.Errorf("store not configured")
- }
- // Check if user already exists
- var count int
- err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", identity.Name).Scan(&count)
- if err != nil {
- return fmt.Errorf("failed to check user existence: %w", err)
- }
- if count > 0 {
- return credential.ErrUserAlreadyExists
- }
- // Start transaction
- tx, err := store.db.BeginTx(ctx, nil)
- if err != nil {
- return fmt.Errorf("failed to begin transaction: %w", err)
- }
- defer tx.Rollback()
- // Marshal account data
- var accountDataJSON []byte
- if identity.Account != nil {
- accountDataJSON, err = json.Marshal(identity.Account)
- if err != nil {
- return fmt.Errorf("failed to marshal account data: %w", err)
- }
- }
- // Marshal actions
- var actionsJSON []byte
- if identity.Actions != nil {
- actionsJSON, err = json.Marshal(identity.Actions)
- if err != nil {
- return fmt.Errorf("failed to marshal actions: %w", err)
- }
- }
- // Insert user
- _, err = tx.ExecContext(ctx,
- "INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)",
- identity.Name, "", accountDataJSON, actionsJSON)
- if err != nil {
- return fmt.Errorf("failed to insert user: %w", err)
- }
- // Insert credentials
- for _, cred := range identity.Credentials {
- _, err = tx.ExecContext(ctx,
- "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
- identity.Name, cred.AccessKey, cred.SecretKey)
- if err != nil {
- return fmt.Errorf("failed to insert credential: %w", err)
- }
- }
- return tx.Commit()
- }
- func (store *PostgresStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) {
- if !store.configured {
- return nil, fmt.Errorf("store not configured")
- }
- var email string
- var accountDataJSON, actionsJSON []byte
- err := store.db.QueryRowContext(ctx,
- "SELECT email, account_data, actions FROM users WHERE username = $1",
- username).Scan(&email, &accountDataJSON, &actionsJSON)
- if err != nil {
- if err == sql.ErrNoRows {
- return nil, credential.ErrUserNotFound
- }
- return nil, fmt.Errorf("failed to query user: %w", err)
- }
- identity := &iam_pb.Identity{
- Name: username,
- }
- // Parse account data
- if len(accountDataJSON) > 0 {
- if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil {
- return nil, fmt.Errorf("failed to unmarshal account data: %w", err)
- }
- }
- // Parse actions
- if len(actionsJSON) > 0 {
- if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil {
- return nil, fmt.Errorf("failed to unmarshal actions: %w", err)
- }
- }
- // Query credentials
- rows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username)
- if err != nil {
- return nil, fmt.Errorf("failed to query credentials: %w", err)
- }
- defer rows.Close()
- for rows.Next() {
- var accessKey, secretKey string
- if err := rows.Scan(&accessKey, &secretKey); err != nil {
- return nil, fmt.Errorf("failed to scan credential: %w", err)
- }
- identity.Credentials = append(identity.Credentials, &iam_pb.Credential{
- AccessKey: accessKey,
- SecretKey: secretKey,
- })
- }
- return identity, nil
- }
- func (store *PostgresStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error {
- if !store.configured {
- return fmt.Errorf("store not configured")
- }
- // Start transaction
- tx, err := store.db.BeginTx(ctx, nil)
- if err != nil {
- return fmt.Errorf("failed to begin transaction: %w", err)
- }
- defer tx.Rollback()
- // Check if user exists
- var count int
- err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
- if err != nil {
- return fmt.Errorf("failed to check user existence: %w", err)
- }
- if count == 0 {
- return credential.ErrUserNotFound
- }
- // Marshal account data
- var accountDataJSON []byte
- if identity.Account != nil {
- accountDataJSON, err = json.Marshal(identity.Account)
- if err != nil {
- return fmt.Errorf("failed to marshal account data: %w", err)
- }
- }
- // Marshal actions
- var actionsJSON []byte
- if identity.Actions != nil {
- actionsJSON, err = json.Marshal(identity.Actions)
- if err != nil {
- return fmt.Errorf("failed to marshal actions: %w", err)
- }
- }
- // Update user
- _, err = tx.ExecContext(ctx,
- "UPDATE users SET email = $2, account_data = $3, actions = $4, updated_at = CURRENT_TIMESTAMP WHERE username = $1",
- username, "", accountDataJSON, actionsJSON)
- if err != nil {
- return fmt.Errorf("failed to update user: %w", err)
- }
- // Delete existing credentials
- _, err = tx.ExecContext(ctx, "DELETE FROM credentials WHERE username = $1", username)
- if err != nil {
- return fmt.Errorf("failed to delete existing credentials: %w", err)
- }
- // Insert new credentials
- for _, cred := range identity.Credentials {
- _, err = tx.ExecContext(ctx,
- "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
- username, cred.AccessKey, cred.SecretKey)
- if err != nil {
- return fmt.Errorf("failed to insert credential: %w", err)
- }
- }
- return tx.Commit()
- }
- func (store *PostgresStore) DeleteUser(ctx context.Context, username string) error {
- if !store.configured {
- return fmt.Errorf("store not configured")
- }
- result, err := store.db.ExecContext(ctx, "DELETE FROM users WHERE username = $1", username)
- if err != nil {
- return fmt.Errorf("failed to delete user: %w", err)
- }
- rowsAffected, err := result.RowsAffected()
- if err != nil {
- return fmt.Errorf("failed to get rows affected: %w", err)
- }
- if rowsAffected == 0 {
- return credential.ErrUserNotFound
- }
- return nil
- }
- func (store *PostgresStore) ListUsers(ctx context.Context) ([]string, error) {
- if !store.configured {
- return nil, fmt.Errorf("store not configured")
- }
- rows, err := store.db.QueryContext(ctx, "SELECT username FROM users ORDER BY username")
- if err != nil {
- return nil, fmt.Errorf("failed to query users: %w", err)
- }
- defer rows.Close()
- var usernames []string
- for rows.Next() {
- var username string
- if err := rows.Scan(&username); err != nil {
- return nil, fmt.Errorf("failed to scan username: %w", err)
- }
- usernames = append(usernames, username)
- }
- return usernames, nil
- }
- func (store *PostgresStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) {
- if !store.configured {
- return nil, fmt.Errorf("store not configured")
- }
- var username string
- err := store.db.QueryRowContext(ctx, "SELECT username FROM credentials WHERE access_key = $1", accessKey).Scan(&username)
- if err != nil {
- if err == sql.ErrNoRows {
- return nil, credential.ErrAccessKeyNotFound
- }
- return nil, fmt.Errorf("failed to query access key: %w", err)
- }
- return store.GetUser(ctx, username)
- }
- func (store *PostgresStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error {
- if !store.configured {
- return fmt.Errorf("store not configured")
- }
- // Check if user exists
- var count int
- err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
- if err != nil {
- return fmt.Errorf("failed to check user existence: %w", err)
- }
- if count == 0 {
- return credential.ErrUserNotFound
- }
- // Insert credential
- _, err = store.db.ExecContext(ctx,
- "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)",
- username, cred.AccessKey, cred.SecretKey)
- if err != nil {
- return fmt.Errorf("failed to insert credential: %w", err)
- }
- return nil
- }
- func (store *PostgresStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error {
- if !store.configured {
- return fmt.Errorf("store not configured")
- }
- result, err := store.db.ExecContext(ctx,
- "DELETE FROM credentials WHERE username = $1 AND access_key = $2",
- username, accessKey)
- if err != nil {
- return fmt.Errorf("failed to delete access key: %w", err)
- }
- rowsAffected, err := result.RowsAffected()
- if err != nil {
- return fmt.Errorf("failed to get rows affected: %w", err)
- }
- if rowsAffected == 0 {
- // Check if user exists
- var count int
- err = store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count)
- if err != nil {
- return fmt.Errorf("failed to check user existence: %w", err)
- }
- if count == 0 {
- return credential.ErrUserNotFound
- }
- return credential.ErrAccessKeyNotFound
- }
- return nil
- }
|