sql.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. package command
  2. import (
  3. "context"
  4. "encoding/csv"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path"
  10. "strings"
  11. "time"
  12. "github.com/peterh/liner"
  13. "github.com/seaweedfs/seaweedfs/weed/query/engine"
  14. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  15. "github.com/seaweedfs/seaweedfs/weed/util/sqlutil"
  16. )
  17. func init() {
  18. cmdSql.Run = runSql
  19. }
  20. var cmdSql = &Command{
  21. UsageLine: "sql [-master=localhost:9333] [-interactive] [-file=query.sql] [-output=table|json|csv] [-database=dbname] [-query=\"SQL\"]",
  22. Short: "advanced SQL query interface for SeaweedFS MQ topics with multiple execution modes",
  23. Long: `Enhanced SQL interface for SeaweedFS Message Queue topics with multiple execution modes.
  24. Execution Modes:
  25. - Interactive shell (default): weed sql -interactive
  26. - Single query: weed sql -query "SELECT * FROM user_events"
  27. - Batch from file: weed sql -file queries.sql
  28. - Context switching: weed sql -database analytics -interactive
  29. Output Formats:
  30. - table: ASCII table format (default for interactive)
  31. - json: JSON format (default for non-interactive)
  32. - csv: Comma-separated values
  33. Features:
  34. - Full WHERE clause support (=, <, >, <=, >=, !=, LIKE, IN)
  35. - Advanced pattern matching with LIKE wildcards (%, _)
  36. - Multi-value filtering with IN operator
  37. - Real MQ namespace and topic discovery
  38. - Database context switching
  39. Examples:
  40. weed sql -interactive
  41. weed sql -query "SHOW DATABASES" -output json
  42. weed sql -file batch_queries.sql -output csv
  43. weed sql -database analytics -query "SELECT COUNT(*) FROM metrics"
  44. weed sql -master broker1:9333 -interactive
  45. `,
  46. }
  47. var (
  48. sqlMaster = cmdSql.Flag.String("master", "localhost:9333", "SeaweedFS master server HTTP address")
  49. sqlInteractive = cmdSql.Flag.Bool("interactive", false, "start interactive shell mode")
  50. sqlFile = cmdSql.Flag.String("file", "", "execute SQL queries from file")
  51. sqlOutput = cmdSql.Flag.String("output", "", "output format: table, json, csv (auto-detected if not specified)")
  52. sqlDatabase = cmdSql.Flag.String("database", "", "default database context")
  53. sqlQuery = cmdSql.Flag.String("query", "", "execute single SQL query")
  54. )
  55. // OutputFormat represents different output formatting options
  56. type OutputFormat string
  57. const (
  58. OutputTable OutputFormat = "table"
  59. OutputJSON OutputFormat = "json"
  60. OutputCSV OutputFormat = "csv"
  61. )
  62. // SQLContext holds the execution context for SQL operations
  63. type SQLContext struct {
  64. engine *engine.SQLEngine
  65. currentDatabase string
  66. outputFormat OutputFormat
  67. interactive bool
  68. }
  69. func runSql(command *Command, args []string) bool {
  70. // Initialize SQL engine with master address for service discovery
  71. sqlEngine := engine.NewSQLEngine(*sqlMaster)
  72. // Determine execution mode and output format
  73. interactive := *sqlInteractive || (*sqlQuery == "" && *sqlFile == "")
  74. outputFormat := determineOutputFormat(*sqlOutput, interactive)
  75. // Create SQL context
  76. ctx := &SQLContext{
  77. engine: sqlEngine,
  78. currentDatabase: *sqlDatabase,
  79. outputFormat: outputFormat,
  80. interactive: interactive,
  81. }
  82. // Set current database in SQL engine if specified via command line
  83. if *sqlDatabase != "" {
  84. ctx.engine.GetCatalog().SetCurrentDatabase(*sqlDatabase)
  85. }
  86. // Execute based on mode
  87. switch {
  88. case *sqlQuery != "":
  89. // Single query mode
  90. return executeSingleQuery(ctx, *sqlQuery)
  91. case *sqlFile != "":
  92. // Batch file mode
  93. return executeFileQueries(ctx, *sqlFile)
  94. default:
  95. // Interactive mode
  96. return runInteractiveShell(ctx)
  97. }
  98. }
  99. // determineOutputFormat selects the appropriate output format
  100. func determineOutputFormat(specified string, interactive bool) OutputFormat {
  101. switch strings.ToLower(specified) {
  102. case "table":
  103. return OutputTable
  104. case "json":
  105. return OutputJSON
  106. case "csv":
  107. return OutputCSV
  108. default:
  109. // Auto-detect based on mode
  110. if interactive {
  111. return OutputTable
  112. }
  113. return OutputJSON
  114. }
  115. }
  116. // executeSingleQuery executes a single query and outputs the result
  117. func executeSingleQuery(ctx *SQLContext, query string) bool {
  118. if ctx.outputFormat != OutputTable {
  119. // Suppress banner for non-interactive output
  120. return executeAndDisplay(ctx, query, false)
  121. }
  122. fmt.Printf("Executing query against %s...\n", *sqlMaster)
  123. return executeAndDisplay(ctx, query, true)
  124. }
  125. // executeFileQueries processes SQL queries from a file
  126. func executeFileQueries(ctx *SQLContext, filename string) bool {
  127. content, err := os.ReadFile(filename)
  128. if err != nil {
  129. fmt.Printf("Error reading file %s: %v\n", filename, err)
  130. return false
  131. }
  132. if ctx.outputFormat == OutputTable && ctx.interactive {
  133. fmt.Printf("Executing queries from %s against %s...\n", filename, *sqlMaster)
  134. }
  135. // Split file content into individual queries (robust approach)
  136. queries := sqlutil.SplitStatements(string(content))
  137. for i, query := range queries {
  138. query = strings.TrimSpace(query)
  139. if query == "" {
  140. continue
  141. }
  142. if ctx.outputFormat == OutputTable && len(queries) > 1 {
  143. fmt.Printf("\n--- Query %d ---\n", i+1)
  144. }
  145. if !executeAndDisplay(ctx, query, ctx.outputFormat == OutputTable) {
  146. return false
  147. }
  148. }
  149. return true
  150. }
  151. // runInteractiveShell starts the enhanced interactive shell with readline support
  152. func runInteractiveShell(ctx *SQLContext) bool {
  153. fmt.Println("SeaweedFS Enhanced SQL Interface")
  154. fmt.Println("Type 'help;' for help, 'exit;' to quit")
  155. fmt.Printf("Connected to master: %s\n", *sqlMaster)
  156. if ctx.currentDatabase != "" {
  157. fmt.Printf("Current database: %s\n", ctx.currentDatabase)
  158. }
  159. fmt.Println("Advanced WHERE operators supported: <=, >=, !=, LIKE, IN")
  160. fmt.Println("Use up/down arrows for command history")
  161. fmt.Println()
  162. // Initialize liner for readline functionality
  163. line := liner.NewLiner()
  164. defer line.Close()
  165. // Handle Ctrl+C gracefully
  166. line.SetCtrlCAborts(true)
  167. grace.OnInterrupt(func() {
  168. line.Close()
  169. })
  170. // Load command history
  171. historyPath := path.Join(os.TempDir(), "weed-sql-history")
  172. if f, err := os.Open(historyPath); err == nil {
  173. line.ReadHistory(f)
  174. f.Close()
  175. }
  176. // Save history on exit
  177. defer func() {
  178. if f, err := os.Create(historyPath); err == nil {
  179. line.WriteHistory(f)
  180. f.Close()
  181. }
  182. }()
  183. var queryBuffer strings.Builder
  184. for {
  185. // Show prompt with current database context
  186. var prompt string
  187. if queryBuffer.Len() == 0 {
  188. if ctx.currentDatabase != "" {
  189. prompt = fmt.Sprintf("seaweedfs:%s> ", ctx.currentDatabase)
  190. } else {
  191. prompt = "seaweedfs> "
  192. }
  193. } else {
  194. prompt = " -> " // Continuation prompt
  195. }
  196. // Read line with readline support
  197. input, err := line.Prompt(prompt)
  198. if err != nil {
  199. if err == liner.ErrPromptAborted {
  200. fmt.Println("Query cancelled")
  201. queryBuffer.Reset()
  202. continue
  203. }
  204. if err != io.EOF {
  205. fmt.Printf("Input error: %v\n", err)
  206. }
  207. break
  208. }
  209. lineStr := strings.TrimSpace(input)
  210. // Handle empty lines
  211. if lineStr == "" {
  212. continue
  213. }
  214. // Accumulate lines in query buffer
  215. if queryBuffer.Len() > 0 {
  216. queryBuffer.WriteString(" ")
  217. }
  218. queryBuffer.WriteString(lineStr)
  219. // Check if we have a complete statement (ends with semicolon or special command)
  220. fullQuery := strings.TrimSpace(queryBuffer.String())
  221. isComplete := strings.HasSuffix(lineStr, ";") ||
  222. isSpecialCommand(fullQuery)
  223. if !isComplete {
  224. continue // Continue reading more lines
  225. }
  226. // Add completed command to history
  227. line.AppendHistory(fullQuery)
  228. // Handle special commands (with or without semicolon)
  229. cleanQuery := strings.TrimSuffix(fullQuery, ";")
  230. cleanQuery = strings.TrimSpace(cleanQuery)
  231. if cleanQuery == "exit" || cleanQuery == "quit" || cleanQuery == "\\q" {
  232. fmt.Println("Goodbye!")
  233. break
  234. }
  235. if cleanQuery == "help" {
  236. showEnhancedHelp()
  237. queryBuffer.Reset()
  238. continue
  239. }
  240. // Handle database switching - use proper SQL parser instead of manual parsing
  241. if strings.HasPrefix(strings.ToUpper(cleanQuery), "USE ") {
  242. // Execute USE statement through the SQL engine for proper parsing
  243. result, err := ctx.engine.ExecuteSQL(context.Background(), cleanQuery)
  244. if err != nil {
  245. fmt.Printf("Error: %v\n\n", err)
  246. } else if result.Error != nil {
  247. fmt.Printf("Error: %v\n\n", result.Error)
  248. } else {
  249. // Extract the database name from the result message for CLI context
  250. if len(result.Rows) > 0 && len(result.Rows[0]) > 0 {
  251. message := result.Rows[0][0].ToString()
  252. // Extract database name from "Database changed to: dbname"
  253. if strings.HasPrefix(message, "Database changed to: ") {
  254. ctx.currentDatabase = strings.TrimPrefix(message, "Database changed to: ")
  255. }
  256. fmt.Printf("%s\n\n", message)
  257. }
  258. }
  259. queryBuffer.Reset()
  260. continue
  261. }
  262. // Handle output format switching
  263. if strings.HasPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ") {
  264. format := strings.TrimSpace(strings.TrimPrefix(strings.ToUpper(cleanQuery), "\\FORMAT "))
  265. switch format {
  266. case "TABLE":
  267. ctx.outputFormat = OutputTable
  268. fmt.Println("Output format set to: table")
  269. case "JSON":
  270. ctx.outputFormat = OutputJSON
  271. fmt.Println("Output format set to: json")
  272. case "CSV":
  273. ctx.outputFormat = OutputCSV
  274. fmt.Println("Output format set to: csv")
  275. default:
  276. fmt.Printf("Invalid format: %s. Supported: table, json, csv\n", format)
  277. }
  278. queryBuffer.Reset()
  279. continue
  280. }
  281. // Execute SQL query (without semicolon)
  282. executeAndDisplay(ctx, cleanQuery, true)
  283. // Reset buffer for next query
  284. queryBuffer.Reset()
  285. }
  286. return true
  287. }
  288. // isSpecialCommand checks if a command is a special command that doesn't require semicolon
  289. func isSpecialCommand(query string) bool {
  290. cleanQuery := strings.TrimSuffix(strings.TrimSpace(query), ";")
  291. cleanQuery = strings.ToLower(cleanQuery)
  292. // Special commands that work with or without semicolon
  293. specialCommands := []string{
  294. "exit", "quit", "\\q", "help",
  295. }
  296. for _, cmd := range specialCommands {
  297. if cleanQuery == cmd {
  298. return true
  299. }
  300. }
  301. // Commands that are exactly specific commands (not just prefixes)
  302. parts := strings.Fields(strings.ToUpper(cleanQuery))
  303. if len(parts) == 0 {
  304. return false
  305. }
  306. return (parts[0] == "USE" && len(parts) >= 2) ||
  307. strings.HasPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ")
  308. }
  309. // executeAndDisplay executes a query and displays the result in the specified format
  310. func executeAndDisplay(ctx *SQLContext, query string, showTiming bool) bool {
  311. startTime := time.Now()
  312. // Execute the query
  313. execCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  314. defer cancel()
  315. result, err := ctx.engine.ExecuteSQL(execCtx, query)
  316. if err != nil {
  317. if ctx.outputFormat == OutputJSON {
  318. errorResult := map[string]interface{}{
  319. "error": err.Error(),
  320. "query": query,
  321. }
  322. jsonBytes, _ := json.MarshalIndent(errorResult, "", " ")
  323. fmt.Println(string(jsonBytes))
  324. } else {
  325. fmt.Printf("Error: %v\n", err)
  326. }
  327. return false
  328. }
  329. if result.Error != nil {
  330. if ctx.outputFormat == OutputJSON {
  331. errorResult := map[string]interface{}{
  332. "error": result.Error.Error(),
  333. "query": query,
  334. }
  335. jsonBytes, _ := json.MarshalIndent(errorResult, "", " ")
  336. fmt.Println(string(jsonBytes))
  337. } else {
  338. fmt.Printf("Query Error: %v\n", result.Error)
  339. }
  340. return false
  341. }
  342. // Display results in the specified format
  343. switch ctx.outputFormat {
  344. case OutputTable:
  345. displayTableResult(result)
  346. case OutputJSON:
  347. displayJSONResult(result)
  348. case OutputCSV:
  349. displayCSVResult(result)
  350. }
  351. // Show execution time for interactive/table mode
  352. if showTiming && ctx.outputFormat == OutputTable {
  353. elapsed := time.Since(startTime)
  354. fmt.Printf("\n(%d rows in set, %.3f sec)\n\n", len(result.Rows), elapsed.Seconds())
  355. }
  356. return true
  357. }
  358. // displayTableResult formats and displays query results in ASCII table format
  359. func displayTableResult(result *engine.QueryResult) {
  360. if len(result.Columns) == 0 {
  361. fmt.Println("Empty result set")
  362. return
  363. }
  364. // Calculate column widths for formatting
  365. colWidths := make([]int, len(result.Columns))
  366. for i, col := range result.Columns {
  367. colWidths[i] = len(col)
  368. }
  369. // Check data for wider columns
  370. for _, row := range result.Rows {
  371. for i, val := range row {
  372. if i < len(colWidths) {
  373. valStr := val.ToString()
  374. if len(valStr) > colWidths[i] {
  375. colWidths[i] = len(valStr)
  376. }
  377. }
  378. }
  379. }
  380. // Print header separator
  381. fmt.Print("+")
  382. for _, width := range colWidths {
  383. fmt.Print(strings.Repeat("-", width+2) + "+")
  384. }
  385. fmt.Println()
  386. // Print column headers
  387. fmt.Print("|")
  388. for i, col := range result.Columns {
  389. fmt.Printf(" %-*s |", colWidths[i], col)
  390. }
  391. fmt.Println()
  392. // Print separator
  393. fmt.Print("+")
  394. for _, width := range colWidths {
  395. fmt.Print(strings.Repeat("-", width+2) + "+")
  396. }
  397. fmt.Println()
  398. // Print data rows
  399. for _, row := range result.Rows {
  400. fmt.Print("|")
  401. for i, val := range row {
  402. if i < len(colWidths) {
  403. fmt.Printf(" %-*s |", colWidths[i], val.ToString())
  404. }
  405. }
  406. fmt.Println()
  407. }
  408. // Print bottom separator
  409. fmt.Print("+")
  410. for _, width := range colWidths {
  411. fmt.Print(strings.Repeat("-", width+2) + "+")
  412. }
  413. fmt.Println()
  414. }
  415. // displayJSONResult outputs query results in JSON format
  416. func displayJSONResult(result *engine.QueryResult) {
  417. // Convert result to JSON-friendly format
  418. jsonResult := map[string]interface{}{
  419. "columns": result.Columns,
  420. "rows": make([]map[string]interface{}, len(result.Rows)),
  421. "count": len(result.Rows),
  422. }
  423. // Convert rows to JSON objects
  424. for i, row := range result.Rows {
  425. rowObj := make(map[string]interface{})
  426. for j, val := range row {
  427. if j < len(result.Columns) {
  428. rowObj[result.Columns[j]] = val.ToString()
  429. }
  430. }
  431. jsonResult["rows"].([]map[string]interface{})[i] = rowObj
  432. }
  433. // Marshal and print JSON
  434. jsonBytes, err := json.MarshalIndent(jsonResult, "", " ")
  435. if err != nil {
  436. fmt.Printf("Error formatting JSON: %v\n", err)
  437. return
  438. }
  439. fmt.Println(string(jsonBytes))
  440. }
  441. // displayCSVResult outputs query results in CSV format
  442. func displayCSVResult(result *engine.QueryResult) {
  443. // Handle execution plan results specially to avoid CSV quoting issues
  444. if len(result.Columns) == 1 && result.Columns[0] == "Query Execution Plan" {
  445. // For execution plans, output directly without CSV encoding to avoid quotes
  446. for _, row := range result.Rows {
  447. if len(row) > 0 {
  448. fmt.Println(row[0].ToString())
  449. }
  450. }
  451. return
  452. }
  453. // Standard CSV output for regular query results
  454. writer := csv.NewWriter(os.Stdout)
  455. defer writer.Flush()
  456. // Write headers
  457. if err := writer.Write(result.Columns); err != nil {
  458. fmt.Printf("Error writing CSV headers: %v\n", err)
  459. return
  460. }
  461. // Write data rows
  462. for _, row := range result.Rows {
  463. csvRow := make([]string, len(row))
  464. for i, val := range row {
  465. csvRow[i] = val.ToString()
  466. }
  467. if err := writer.Write(csvRow); err != nil {
  468. fmt.Printf("Error writing CSV row: %v\n", err)
  469. return
  470. }
  471. }
  472. }
  473. func showEnhancedHelp() {
  474. fmt.Println(`SeaweedFS Enhanced SQL Interface Help:
  475. METADATA OPERATIONS:
  476. SHOW DATABASES; - List all MQ namespaces
  477. SHOW TABLES; - List all topics in current namespace
  478. SHOW TABLES FROM database; - List topics in specific namespace
  479. DESCRIBE table_name; - Show table schema
  480. ADVANCED QUERYING:
  481. SELECT * FROM table_name; - Query all data
  482. SELECT col1, col2 FROM table WHERE ...; - Column projection
  483. SELECT * FROM table WHERE id <= 100; - Range filtering
  484. SELECT * FROM table WHERE name LIKE 'admin%'; - Pattern matching
  485. SELECT * FROM table WHERE status IN ('active', 'pending'); - Multi-value
  486. SELECT COUNT(*), MAX(id), MIN(id) FROM ...; - Aggregation functions
  487. QUERY ANALYSIS:
  488. EXPLAIN SELECT ...; - Show hierarchical execution plan
  489. (data sources, optimizations, timing)
  490. DDL OPERATIONS:
  491. CREATE TABLE topic (field1 INT, field2 STRING); - Create topic
  492. Note: ALTER TABLE and DROP TABLE are not supported
  493. SPECIAL COMMANDS:
  494. USE database_name; - Switch database context
  495. \format table|json|csv - Change output format
  496. help; - Show this help
  497. exit; or quit; or \q - Exit interface
  498. EXTENDED WHERE OPERATORS:
  499. =, <, >, <=, >= - Comparison operators
  500. !=, <> - Not equal operators
  501. LIKE 'pattern%' - Pattern matching (% = any chars, _ = single char)
  502. IN (value1, value2, ...) - Multi-value matching
  503. AND, OR - Logical operators
  504. EXAMPLES:
  505. SELECT * FROM user_events WHERE user_id >= 10 AND status != 'deleted';
  506. SELECT username FROM users WHERE email LIKE '%@company.com';
  507. SELECT * FROM logs WHERE level IN ('error', 'warning') AND timestamp >= '2023-01-01';
  508. EXPLAIN SELECT MAX(id) FROM events; -- View execution plan
  509. Current Status: Full WHERE clause support + Real MQ integration`)
  510. }