db.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. package command
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "os"
  8. "os/signal"
  9. "strings"
  10. "syscall"
  11. "time"
  12. "github.com/seaweedfs/seaweedfs/weed/server/postgres"
  13. "github.com/seaweedfs/seaweedfs/weed/util"
  14. )
  15. var (
  16. dbOptions DBOptions
  17. )
  18. type DBOptions struct {
  19. host *string
  20. port *int
  21. masterAddr *string
  22. authMethod *string
  23. users *string
  24. database *string
  25. maxConns *int
  26. idleTimeout *string
  27. tlsCert *string
  28. tlsKey *string
  29. }
  30. func init() {
  31. cmdDB.Run = runDB // break init cycle
  32. dbOptions.host = cmdDB.Flag.String("host", "localhost", "Database server host")
  33. dbOptions.port = cmdDB.Flag.Int("port", 5432, "Database server port")
  34. dbOptions.masterAddr = cmdDB.Flag.String("master", "localhost:9333", "SeaweedFS master server address")
  35. dbOptions.authMethod = cmdDB.Flag.String("auth", "trust", "Authentication method: trust, password, md5")
  36. dbOptions.users = cmdDB.Flag.String("users", "", "User credentials for auth (JSON format '{\"user1\":\"pass1\",\"user2\":\"pass2\"}' or file '@/path/to/users.json')")
  37. dbOptions.database = cmdDB.Flag.String("database", "default", "Default database name")
  38. dbOptions.maxConns = cmdDB.Flag.Int("max-connections", 100, "Maximum concurrent connections per server")
  39. dbOptions.idleTimeout = cmdDB.Flag.String("idle-timeout", "1h", "Connection idle timeout")
  40. dbOptions.tlsCert = cmdDB.Flag.String("tls-cert", "", "TLS certificate file path")
  41. dbOptions.tlsKey = cmdDB.Flag.String("tls-key", "", "TLS private key file path")
  42. }
  43. var cmdDB = &Command{
  44. UsageLine: "db -port=5432 -master=<master_server>",
  45. Short: "start a PostgreSQL-compatible database server for SQL queries",
  46. Long: `Start a PostgreSQL wire protocol compatible database server that provides SQL query access to SeaweedFS.
  47. This database server enables any PostgreSQL client, tool, or application to connect to SeaweedFS
  48. and execute SQL queries against MQ topics. It implements the PostgreSQL wire protocol for maximum
  49. compatibility with the existing PostgreSQL ecosystem.
  50. Examples:
  51. # Start database server on default port 5432
  52. weed db
  53. # Start with MD5 authentication using JSON format (recommended)
  54. weed db -auth=md5 -users='{"admin":"secret","readonly":"view123"}'
  55. # Start with complex passwords using JSON format
  56. weed db -auth=md5 -users='{"admin":"pass;with;semicolons","user":"password:with:colons"}'
  57. # Start with credentials from JSON file (most secure)
  58. weed db -auth=md5 -users="@/etc/seaweedfs/users.json"
  59. # Start with custom port and master
  60. weed db -port=5433 -master=master1:9333
  61. # Allow connections from any host
  62. weed db -host=0.0.0.0 -port=5432
  63. # Start with TLS encryption
  64. weed db -tls-cert=server.crt -tls-key=server.key
  65. Client Connection Examples:
  66. # psql command line client
  67. psql "host=localhost port=5432 dbname=default user=seaweedfs"
  68. psql -h localhost -p 5432 -U seaweedfs -d default
  69. # With password
  70. PGPASSWORD=secret psql -h localhost -p 5432 -U admin -d default
  71. # Connection string
  72. psql "postgresql://admin:secret@localhost:5432/default"
  73. Programming Language Examples:
  74. # Python (psycopg2)
  75. import psycopg2
  76. conn = psycopg2.connect(
  77. host="localhost", port=5432,
  78. user="seaweedfs", database="default"
  79. )
  80. # Java JDBC
  81. String url = "jdbc:postgresql://localhost:5432/default";
  82. Connection conn = DriverManager.getConnection(url, "seaweedfs", "");
  83. # Go (lib/pq)
  84. db, err := sql.Open("postgres", "host=localhost port=5432 user=seaweedfs dbname=default sslmode=disable")
  85. # Node.js (pg)
  86. const client = new Client({
  87. host: 'localhost', port: 5432,
  88. user: 'seaweedfs', database: 'default'
  89. });
  90. Supported SQL Operations:
  91. - SELECT queries on MQ topics
  92. - DESCRIBE/DESC table_name commands
  93. - EXPLAIN query execution plans
  94. - SHOW DATABASES/TABLES commands
  95. - Aggregation functions (COUNT, SUM, AVG, MIN, MAX)
  96. - WHERE clauses with filtering
  97. - System columns (_timestamp_ns, _key, _source)
  98. - Basic PostgreSQL system queries (version(), current_database(), current_user)
  99. Authentication Methods:
  100. - trust: No authentication required (default)
  101. - password: Clear text password authentication
  102. - md5: MD5 password authentication
  103. User Credential Formats:
  104. - JSON format: '{"user1":"pass1","user2":"pass2"}' (supports any special characters)
  105. - File format: "@/path/to/users.json" (JSON file)
  106. Note: JSON format supports passwords with semicolons, colons, and any other special characters.
  107. File format is recommended for production to keep credentials secure.
  108. Compatible Tools:
  109. - psql (PostgreSQL command line client)
  110. - Any PostgreSQL JDBC/ODBC compatible tool
  111. Security Features:
  112. - Multiple authentication methods
  113. - TLS encryption support
  114. - Read-only access (no data modification)
  115. Performance Features:
  116. - Fast path aggregation optimization (COUNT, MIN, MAX without WHERE clauses)
  117. - Hybrid data scanning (parquet files + live logs)
  118. - PostgreSQL wire protocol
  119. - Query result streaming
  120. `,
  121. }
  122. func runDB(cmd *Command, args []string) bool {
  123. util.LoadConfiguration("security", false)
  124. // Validate options
  125. if *dbOptions.masterAddr == "" {
  126. fmt.Fprintf(os.Stderr, "Error: master address is required\n")
  127. return false
  128. }
  129. // Parse authentication method
  130. authMethod, err := parseAuthMethod(*dbOptions.authMethod)
  131. if err != nil {
  132. fmt.Fprintf(os.Stderr, "Error: %v\n", err)
  133. return false
  134. }
  135. // Parse user credentials
  136. users, err := parseUsers(*dbOptions.users, authMethod)
  137. if err != nil {
  138. fmt.Fprintf(os.Stderr, "Error: %v\n", err)
  139. return false
  140. }
  141. // Parse idle timeout
  142. idleTimeout, err := time.ParseDuration(*dbOptions.idleTimeout)
  143. if err != nil {
  144. fmt.Fprintf(os.Stderr, "Error parsing idle timeout: %v\n", err)
  145. return false
  146. }
  147. // Validate port number
  148. if err := validatePortNumber(*dbOptions.port); err != nil {
  149. fmt.Fprintf(os.Stderr, "Error: %v\n", err)
  150. return false
  151. }
  152. // Setup TLS if requested
  153. var tlsConfig *tls.Config
  154. if *dbOptions.tlsCert != "" && *dbOptions.tlsKey != "" {
  155. cert, err := tls.LoadX509KeyPair(*dbOptions.tlsCert, *dbOptions.tlsKey)
  156. if err != nil {
  157. fmt.Fprintf(os.Stderr, "Error loading TLS certificates: %v\n", err)
  158. return false
  159. }
  160. tlsConfig = &tls.Config{
  161. Certificates: []tls.Certificate{cert},
  162. }
  163. }
  164. // Create server configuration
  165. config := &postgres.PostgreSQLServerConfig{
  166. Host: *dbOptions.host,
  167. Port: *dbOptions.port,
  168. AuthMethod: authMethod,
  169. Users: users,
  170. Database: *dbOptions.database,
  171. MaxConns: *dbOptions.maxConns,
  172. IdleTimeout: idleTimeout,
  173. TLSConfig: tlsConfig,
  174. }
  175. // Create database server
  176. dbServer, err := postgres.NewPostgreSQLServer(config, *dbOptions.masterAddr)
  177. if err != nil {
  178. fmt.Fprintf(os.Stderr, "Error creating database server: %v\n", err)
  179. return false
  180. }
  181. // Print startup information
  182. fmt.Printf("Starting SeaweedFS Database Server...\n")
  183. fmt.Printf("Host: %s\n", *dbOptions.host)
  184. fmt.Printf("Port: %d\n", *dbOptions.port)
  185. fmt.Printf("Master: %s\n", *dbOptions.masterAddr)
  186. fmt.Printf("Database: %s\n", *dbOptions.database)
  187. fmt.Printf("Auth Method: %s\n", *dbOptions.authMethod)
  188. fmt.Printf("Max Connections: %d\n", *dbOptions.maxConns)
  189. fmt.Printf("Idle Timeout: %s\n", *dbOptions.idleTimeout)
  190. if tlsConfig != nil {
  191. fmt.Printf("TLS: Enabled\n")
  192. } else {
  193. fmt.Printf("TLS: Disabled\n")
  194. }
  195. if len(users) > 0 {
  196. fmt.Printf("Users: %d configured\n", len(users))
  197. }
  198. fmt.Printf("\nDatabase Connection Examples:\n")
  199. fmt.Printf(" psql -h %s -p %d -U seaweedfs -d %s\n", *dbOptions.host, *dbOptions.port, *dbOptions.database)
  200. if len(users) > 0 {
  201. // Show first user as example
  202. for username := range users {
  203. fmt.Printf(" psql -h %s -p %d -U %s -d %s\n", *dbOptions.host, *dbOptions.port, username, *dbOptions.database)
  204. break
  205. }
  206. }
  207. fmt.Printf(" postgresql://%s:%d/%s\n", *dbOptions.host, *dbOptions.port, *dbOptions.database)
  208. fmt.Printf("\nSupported Operations:\n")
  209. fmt.Printf(" - SELECT queries on MQ topics\n")
  210. fmt.Printf(" - DESCRIBE/DESC table_name\n")
  211. fmt.Printf(" - EXPLAIN query execution plans\n")
  212. fmt.Printf(" - SHOW DATABASES/TABLES\n")
  213. fmt.Printf(" - Aggregations: COUNT, SUM, AVG, MIN, MAX\n")
  214. fmt.Printf(" - System columns: _timestamp_ns, _key, _source\n")
  215. fmt.Printf(" - Basic PostgreSQL system queries\n")
  216. fmt.Printf("\nReady for database connections!\n\n")
  217. // Start the server
  218. err = dbServer.Start()
  219. if err != nil {
  220. fmt.Fprintf(os.Stderr, "Error starting database server: %v\n", err)
  221. return false
  222. }
  223. // Set up signal handling for graceful shutdown
  224. sigChan := make(chan os.Signal, 1)
  225. signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  226. // Wait for shutdown signal
  227. <-sigChan
  228. fmt.Printf("\nReceived shutdown signal, stopping database server...\n")
  229. // Create context with timeout for graceful shutdown
  230. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  231. defer cancel()
  232. // Stop the server with timeout
  233. done := make(chan error, 1)
  234. go func() {
  235. done <- dbServer.Stop()
  236. }()
  237. select {
  238. case err := <-done:
  239. if err != nil {
  240. fmt.Fprintf(os.Stderr, "Error stopping database server: %v\n", err)
  241. return false
  242. }
  243. fmt.Printf("Database server stopped successfully\n")
  244. case <-ctx.Done():
  245. fmt.Fprintf(os.Stderr, "Timeout waiting for database server to stop\n")
  246. return false
  247. }
  248. return true
  249. }
  250. // parseAuthMethod parses the authentication method string
  251. func parseAuthMethod(method string) (postgres.AuthMethod, error) {
  252. switch strings.ToLower(method) {
  253. case "trust":
  254. return postgres.AuthTrust, nil
  255. case "password":
  256. return postgres.AuthPassword, nil
  257. case "md5":
  258. return postgres.AuthMD5, nil
  259. default:
  260. return postgres.AuthTrust, fmt.Errorf("unsupported auth method '%s'. Supported: trust, password, md5", method)
  261. }
  262. }
  263. // parseUsers parses the user credentials string with support for secure formats only
  264. // Supported formats:
  265. // 1. JSON format: {"username":"password","username2":"password2"}
  266. // 2. File format: /path/to/users.json or @/path/to/users.json
  267. func parseUsers(usersStr string, authMethod postgres.AuthMethod) (map[string]string, error) {
  268. users := make(map[string]string)
  269. if usersStr == "" {
  270. // No users specified
  271. if authMethod != postgres.AuthTrust {
  272. return nil, fmt.Errorf("users must be specified when auth method is not 'trust'")
  273. }
  274. return users, nil
  275. }
  276. // Trim whitespace
  277. usersStr = strings.TrimSpace(usersStr)
  278. // Determine format and parse accordingly
  279. if strings.HasPrefix(usersStr, "{") && strings.HasSuffix(usersStr, "}") {
  280. // JSON format
  281. return parseUsersJSON(usersStr, authMethod)
  282. }
  283. // Check if it's a file path (with or without @ prefix) before declaring invalid format
  284. filePath := strings.TrimPrefix(usersStr, "@")
  285. if _, err := os.Stat(filePath); err == nil {
  286. // File format
  287. return parseUsersFile(usersStr, authMethod) // Pass original string to preserve @ handling
  288. }
  289. // Invalid format
  290. return nil, fmt.Errorf("invalid user credentials format. Use JSON format '{\"user\":\"pass\"}' or file format '@/path/to/users.json' or 'path/to/users.json'. Legacy semicolon-separated format is no longer supported")
  291. }
  292. // parseUsersJSON parses user credentials from JSON format
  293. func parseUsersJSON(jsonStr string, authMethod postgres.AuthMethod) (map[string]string, error) {
  294. var users map[string]string
  295. if err := json.Unmarshal([]byte(jsonStr), &users); err != nil {
  296. return nil, fmt.Errorf("invalid JSON format for users: %v", err)
  297. }
  298. // Validate users
  299. for username, password := range users {
  300. if username == "" {
  301. return nil, fmt.Errorf("empty username in JSON user specification")
  302. }
  303. if authMethod != postgres.AuthTrust && password == "" {
  304. return nil, fmt.Errorf("empty password for user '%s' with auth method", username)
  305. }
  306. }
  307. return users, nil
  308. }
  309. // parseUsersFile parses user credentials from a JSON file
  310. func parseUsersFile(filePath string, authMethod postgres.AuthMethod) (map[string]string, error) {
  311. // Remove @ prefix if present
  312. filePath = strings.TrimPrefix(filePath, "@")
  313. // Read file content
  314. content, err := os.ReadFile(filePath)
  315. if err != nil {
  316. return nil, fmt.Errorf("failed to read users file '%s': %v", filePath, err)
  317. }
  318. contentStr := strings.TrimSpace(string(content))
  319. // File must contain JSON format
  320. if !strings.HasPrefix(contentStr, "{") || !strings.HasSuffix(contentStr, "}") {
  321. return nil, fmt.Errorf("users file '%s' must contain JSON format: {\"user\":\"pass\"}. Legacy formats are no longer supported", filePath)
  322. }
  323. // Parse as JSON
  324. return parseUsersJSON(contentStr, authMethod)
  325. }
  326. // validatePortNumber validates that the port number is reasonable
  327. func validatePortNumber(port int) error {
  328. if port < 1 || port > 65535 {
  329. return fmt.Errorf("port number must be between 1 and 65535, got %d", port)
  330. }
  331. if port < 1024 {
  332. fmt.Fprintf(os.Stderr, "Warning: port number %d may require root privileges\n", port)
  333. }
  334. return nil
  335. }