| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595 |
- package command
- import (
- "context"
- "encoding/csv"
- "encoding/json"
- "fmt"
- "io"
- "os"
- "path"
- "strings"
- "time"
- "github.com/peterh/liner"
- "github.com/seaweedfs/seaweedfs/weed/query/engine"
- "github.com/seaweedfs/seaweedfs/weed/util/grace"
- "github.com/seaweedfs/seaweedfs/weed/util/sqlutil"
- )
- func init() {
- cmdSql.Run = runSql
- }
- var cmdSql = &Command{
- UsageLine: "sql [-master=localhost:9333] [-interactive] [-file=query.sql] [-output=table|json|csv] [-database=dbname] [-query=\"SQL\"]",
- Short: "advanced SQL query interface for SeaweedFS MQ topics with multiple execution modes",
- Long: `Enhanced SQL interface for SeaweedFS Message Queue topics with multiple execution modes.
- Execution Modes:
- - Interactive shell (default): weed sql -interactive
- - Single query: weed sql -query "SELECT * FROM user_events"
- - Batch from file: weed sql -file queries.sql
- - Context switching: weed sql -database analytics -interactive
- Output Formats:
- - table: ASCII table format (default for interactive)
- - json: JSON format (default for non-interactive)
- - csv: Comma-separated values
- Features:
- - Full WHERE clause support (=, <, >, <=, >=, !=, LIKE, IN)
- - Advanced pattern matching with LIKE wildcards (%, _)
- - Multi-value filtering with IN operator
- - Real MQ namespace and topic discovery
- - Database context switching
- Examples:
- weed sql -interactive
- weed sql -query "SHOW DATABASES" -output json
- weed sql -file batch_queries.sql -output csv
- weed sql -database analytics -query "SELECT COUNT(*) FROM metrics"
- weed sql -master broker1:9333 -interactive
- `,
- }
- var (
- sqlMaster = cmdSql.Flag.String("master", "localhost:9333", "SeaweedFS master server HTTP address")
- sqlInteractive = cmdSql.Flag.Bool("interactive", false, "start interactive shell mode")
- sqlFile = cmdSql.Flag.String("file", "", "execute SQL queries from file")
- sqlOutput = cmdSql.Flag.String("output", "", "output format: table, json, csv (auto-detected if not specified)")
- sqlDatabase = cmdSql.Flag.String("database", "", "default database context")
- sqlQuery = cmdSql.Flag.String("query", "", "execute single SQL query")
- )
- // OutputFormat represents different output formatting options
- type OutputFormat string
- const (
- OutputTable OutputFormat = "table"
- OutputJSON OutputFormat = "json"
- OutputCSV OutputFormat = "csv"
- )
- // SQLContext holds the execution context for SQL operations
- type SQLContext struct {
- engine *engine.SQLEngine
- currentDatabase string
- outputFormat OutputFormat
- interactive bool
- }
- func runSql(command *Command, args []string) bool {
- // Initialize SQL engine with master address for service discovery
- sqlEngine := engine.NewSQLEngine(*sqlMaster)
- // Determine execution mode and output format
- interactive := *sqlInteractive || (*sqlQuery == "" && *sqlFile == "")
- outputFormat := determineOutputFormat(*sqlOutput, interactive)
- // Create SQL context
- ctx := &SQLContext{
- engine: sqlEngine,
- currentDatabase: *sqlDatabase,
- outputFormat: outputFormat,
- interactive: interactive,
- }
- // Set current database in SQL engine if specified via command line
- if *sqlDatabase != "" {
- ctx.engine.GetCatalog().SetCurrentDatabase(*sqlDatabase)
- }
- // Execute based on mode
- switch {
- case *sqlQuery != "":
- // Single query mode
- return executeSingleQuery(ctx, *sqlQuery)
- case *sqlFile != "":
- // Batch file mode
- return executeFileQueries(ctx, *sqlFile)
- default:
- // Interactive mode
- return runInteractiveShell(ctx)
- }
- }
- // determineOutputFormat selects the appropriate output format
- func determineOutputFormat(specified string, interactive bool) OutputFormat {
- switch strings.ToLower(specified) {
- case "table":
- return OutputTable
- case "json":
- return OutputJSON
- case "csv":
- return OutputCSV
- default:
- // Auto-detect based on mode
- if interactive {
- return OutputTable
- }
- return OutputJSON
- }
- }
- // executeSingleQuery executes a single query and outputs the result
- func executeSingleQuery(ctx *SQLContext, query string) bool {
- if ctx.outputFormat != OutputTable {
- // Suppress banner for non-interactive output
- return executeAndDisplay(ctx, query, false)
- }
- fmt.Printf("Executing query against %s...\n", *sqlMaster)
- return executeAndDisplay(ctx, query, true)
- }
- // executeFileQueries processes SQL queries from a file
- func executeFileQueries(ctx *SQLContext, filename string) bool {
- content, err := os.ReadFile(filename)
- if err != nil {
- fmt.Printf("Error reading file %s: %v\n", filename, err)
- return false
- }
- if ctx.outputFormat == OutputTable && ctx.interactive {
- fmt.Printf("Executing queries from %s against %s...\n", filename, *sqlMaster)
- }
- // Split file content into individual queries (robust approach)
- queries := sqlutil.SplitStatements(string(content))
- for i, query := range queries {
- query = strings.TrimSpace(query)
- if query == "" {
- continue
- }
- if ctx.outputFormat == OutputTable && len(queries) > 1 {
- fmt.Printf("\n--- Query %d ---\n", i+1)
- }
- if !executeAndDisplay(ctx, query, ctx.outputFormat == OutputTable) {
- return false
- }
- }
- return true
- }
- // runInteractiveShell starts the enhanced interactive shell with readline support
- func runInteractiveShell(ctx *SQLContext) bool {
- fmt.Println("SeaweedFS Enhanced SQL Interface")
- fmt.Println("Type 'help;' for help, 'exit;' to quit")
- fmt.Printf("Connected to master: %s\n", *sqlMaster)
- if ctx.currentDatabase != "" {
- fmt.Printf("Current database: %s\n", ctx.currentDatabase)
- }
- fmt.Println("Advanced WHERE operators supported: <=, >=, !=, LIKE, IN")
- fmt.Println("Use up/down arrows for command history")
- fmt.Println()
- // Initialize liner for readline functionality
- line := liner.NewLiner()
- defer line.Close()
- // Handle Ctrl+C gracefully
- line.SetCtrlCAborts(true)
- grace.OnInterrupt(func() {
- line.Close()
- })
- // Load command history
- historyPath := path.Join(os.TempDir(), "weed-sql-history")
- if f, err := os.Open(historyPath); err == nil {
- line.ReadHistory(f)
- f.Close()
- }
- // Save history on exit
- defer func() {
- if f, err := os.Create(historyPath); err == nil {
- line.WriteHistory(f)
- f.Close()
- }
- }()
- var queryBuffer strings.Builder
- for {
- // Show prompt with current database context
- var prompt string
- if queryBuffer.Len() == 0 {
- if ctx.currentDatabase != "" {
- prompt = fmt.Sprintf("seaweedfs:%s> ", ctx.currentDatabase)
- } else {
- prompt = "seaweedfs> "
- }
- } else {
- prompt = " -> " // Continuation prompt
- }
- // Read line with readline support
- input, err := line.Prompt(prompt)
- if err != nil {
- if err == liner.ErrPromptAborted {
- fmt.Println("Query cancelled")
- queryBuffer.Reset()
- continue
- }
- if err != io.EOF {
- fmt.Printf("Input error: %v\n", err)
- }
- break
- }
- lineStr := strings.TrimSpace(input)
- // Handle empty lines
- if lineStr == "" {
- continue
- }
- // Accumulate lines in query buffer
- if queryBuffer.Len() > 0 {
- queryBuffer.WriteString(" ")
- }
- queryBuffer.WriteString(lineStr)
- // Check if we have a complete statement (ends with semicolon or special command)
- fullQuery := strings.TrimSpace(queryBuffer.String())
- isComplete := strings.HasSuffix(lineStr, ";") ||
- isSpecialCommand(fullQuery)
- if !isComplete {
- continue // Continue reading more lines
- }
- // Add completed command to history
- line.AppendHistory(fullQuery)
- // Handle special commands (with or without semicolon)
- cleanQuery := strings.TrimSuffix(fullQuery, ";")
- cleanQuery = strings.TrimSpace(cleanQuery)
- if cleanQuery == "exit" || cleanQuery == "quit" || cleanQuery == "\\q" {
- fmt.Println("Goodbye!")
- break
- }
- if cleanQuery == "help" {
- showEnhancedHelp()
- queryBuffer.Reset()
- continue
- }
- // Handle database switching - use proper SQL parser instead of manual parsing
- if strings.HasPrefix(strings.ToUpper(cleanQuery), "USE ") {
- // Execute USE statement through the SQL engine for proper parsing
- result, err := ctx.engine.ExecuteSQL(context.Background(), cleanQuery)
- if err != nil {
- fmt.Printf("Error: %v\n\n", err)
- } else if result.Error != nil {
- fmt.Printf("Error: %v\n\n", result.Error)
- } else {
- // Extract the database name from the result message for CLI context
- if len(result.Rows) > 0 && len(result.Rows[0]) > 0 {
- message := result.Rows[0][0].ToString()
- // Extract database name from "Database changed to: dbname"
- if strings.HasPrefix(message, "Database changed to: ") {
- ctx.currentDatabase = strings.TrimPrefix(message, "Database changed to: ")
- }
- fmt.Printf("%s\n\n", message)
- }
- }
- queryBuffer.Reset()
- continue
- }
- // Handle output format switching
- if strings.HasPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ") {
- format := strings.TrimSpace(strings.TrimPrefix(strings.ToUpper(cleanQuery), "\\FORMAT "))
- switch format {
- case "TABLE":
- ctx.outputFormat = OutputTable
- fmt.Println("Output format set to: table")
- case "JSON":
- ctx.outputFormat = OutputJSON
- fmt.Println("Output format set to: json")
- case "CSV":
- ctx.outputFormat = OutputCSV
- fmt.Println("Output format set to: csv")
- default:
- fmt.Printf("Invalid format: %s. Supported: table, json, csv\n", format)
- }
- queryBuffer.Reset()
- continue
- }
- // Execute SQL query (without semicolon)
- executeAndDisplay(ctx, cleanQuery, true)
- // Reset buffer for next query
- queryBuffer.Reset()
- }
- return true
- }
- // isSpecialCommand checks if a command is a special command that doesn't require semicolon
- func isSpecialCommand(query string) bool {
- cleanQuery := strings.TrimSuffix(strings.TrimSpace(query), ";")
- cleanQuery = strings.ToLower(cleanQuery)
- // Special commands that work with or without semicolon
- specialCommands := []string{
- "exit", "quit", "\\q", "help",
- }
- for _, cmd := range specialCommands {
- if cleanQuery == cmd {
- return true
- }
- }
- // Commands that are exactly specific commands (not just prefixes)
- parts := strings.Fields(strings.ToUpper(cleanQuery))
- if len(parts) == 0 {
- return false
- }
- return (parts[0] == "USE" && len(parts) >= 2) ||
- strings.HasPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ")
- }
- // executeAndDisplay executes a query and displays the result in the specified format
- func executeAndDisplay(ctx *SQLContext, query string, showTiming bool) bool {
- startTime := time.Now()
- // Execute the query
- execCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
- defer cancel()
- result, err := ctx.engine.ExecuteSQL(execCtx, query)
- if err != nil {
- if ctx.outputFormat == OutputJSON {
- errorResult := map[string]interface{}{
- "error": err.Error(),
- "query": query,
- }
- jsonBytes, _ := json.MarshalIndent(errorResult, "", " ")
- fmt.Println(string(jsonBytes))
- } else {
- fmt.Printf("Error: %v\n", err)
- }
- return false
- }
- if result.Error != nil {
- if ctx.outputFormat == OutputJSON {
- errorResult := map[string]interface{}{
- "error": result.Error.Error(),
- "query": query,
- }
- jsonBytes, _ := json.MarshalIndent(errorResult, "", " ")
- fmt.Println(string(jsonBytes))
- } else {
- fmt.Printf("Query Error: %v\n", result.Error)
- }
- return false
- }
- // Display results in the specified format
- switch ctx.outputFormat {
- case OutputTable:
- displayTableResult(result)
- case OutputJSON:
- displayJSONResult(result)
- case OutputCSV:
- displayCSVResult(result)
- }
- // Show execution time for interactive/table mode
- if showTiming && ctx.outputFormat == OutputTable {
- elapsed := time.Since(startTime)
- fmt.Printf("\n(%d rows in set, %.3f sec)\n\n", len(result.Rows), elapsed.Seconds())
- }
- return true
- }
- // displayTableResult formats and displays query results in ASCII table format
- func displayTableResult(result *engine.QueryResult) {
- if len(result.Columns) == 0 {
- fmt.Println("Empty result set")
- return
- }
- // Calculate column widths for formatting
- colWidths := make([]int, len(result.Columns))
- for i, col := range result.Columns {
- colWidths[i] = len(col)
- }
- // Check data for wider columns
- for _, row := range result.Rows {
- for i, val := range row {
- if i < len(colWidths) {
- valStr := val.ToString()
- if len(valStr) > colWidths[i] {
- colWidths[i] = len(valStr)
- }
- }
- }
- }
- // Print header separator
- fmt.Print("+")
- for _, width := range colWidths {
- fmt.Print(strings.Repeat("-", width+2) + "+")
- }
- fmt.Println()
- // Print column headers
- fmt.Print("|")
- for i, col := range result.Columns {
- fmt.Printf(" %-*s |", colWidths[i], col)
- }
- fmt.Println()
- // Print separator
- fmt.Print("+")
- for _, width := range colWidths {
- fmt.Print(strings.Repeat("-", width+2) + "+")
- }
- fmt.Println()
- // Print data rows
- for _, row := range result.Rows {
- fmt.Print("|")
- for i, val := range row {
- if i < len(colWidths) {
- fmt.Printf(" %-*s |", colWidths[i], val.ToString())
- }
- }
- fmt.Println()
- }
- // Print bottom separator
- fmt.Print("+")
- for _, width := range colWidths {
- fmt.Print(strings.Repeat("-", width+2) + "+")
- }
- fmt.Println()
- }
- // displayJSONResult outputs query results in JSON format
- func displayJSONResult(result *engine.QueryResult) {
- // Convert result to JSON-friendly format
- jsonResult := map[string]interface{}{
- "columns": result.Columns,
- "rows": make([]map[string]interface{}, len(result.Rows)),
- "count": len(result.Rows),
- }
- // Convert rows to JSON objects
- for i, row := range result.Rows {
- rowObj := make(map[string]interface{})
- for j, val := range row {
- if j < len(result.Columns) {
- rowObj[result.Columns[j]] = val.ToString()
- }
- }
- jsonResult["rows"].([]map[string]interface{})[i] = rowObj
- }
- // Marshal and print JSON
- jsonBytes, err := json.MarshalIndent(jsonResult, "", " ")
- if err != nil {
- fmt.Printf("Error formatting JSON: %v\n", err)
- return
- }
- fmt.Println(string(jsonBytes))
- }
- // displayCSVResult outputs query results in CSV format
- func displayCSVResult(result *engine.QueryResult) {
- // Handle execution plan results specially to avoid CSV quoting issues
- if len(result.Columns) == 1 && result.Columns[0] == "Query Execution Plan" {
- // For execution plans, output directly without CSV encoding to avoid quotes
- for _, row := range result.Rows {
- if len(row) > 0 {
- fmt.Println(row[0].ToString())
- }
- }
- return
- }
- // Standard CSV output for regular query results
- writer := csv.NewWriter(os.Stdout)
- defer writer.Flush()
- // Write headers
- if err := writer.Write(result.Columns); err != nil {
- fmt.Printf("Error writing CSV headers: %v\n", err)
- return
- }
- // Write data rows
- for _, row := range result.Rows {
- csvRow := make([]string, len(row))
- for i, val := range row {
- csvRow[i] = val.ToString()
- }
- if err := writer.Write(csvRow); err != nil {
- fmt.Printf("Error writing CSV row: %v\n", err)
- return
- }
- }
- }
- func showEnhancedHelp() {
- fmt.Println(`SeaweedFS Enhanced SQL Interface Help:
- METADATA OPERATIONS:
- SHOW DATABASES; - List all MQ namespaces
- SHOW TABLES; - List all topics in current namespace
- SHOW TABLES FROM database; - List topics in specific namespace
- DESCRIBE table_name; - Show table schema
- ADVANCED QUERYING:
- SELECT * FROM table_name; - Query all data
- SELECT col1, col2 FROM table WHERE ...; - Column projection
- SELECT * FROM table WHERE id <= 100; - Range filtering
- SELECT * FROM table WHERE name LIKE 'admin%'; - Pattern matching
- SELECT * FROM table WHERE status IN ('active', 'pending'); - Multi-value
- SELECT COUNT(*), MAX(id), MIN(id) FROM ...; - Aggregation functions
- QUERY ANALYSIS:
- EXPLAIN SELECT ...; - Show hierarchical execution plan
- (data sources, optimizations, timing)
- DDL OPERATIONS:
- CREATE TABLE topic (field1 INT, field2 STRING); - Create topic
- Note: ALTER TABLE and DROP TABLE are not supported
- SPECIAL COMMANDS:
- USE database_name; - Switch database context
- \format table|json|csv - Change output format
- help; - Show this help
- exit; or quit; or \q - Exit interface
- EXTENDED WHERE OPERATORS:
- =, <, >, <=, >= - Comparison operators
- !=, <> - Not equal operators
- LIKE 'pattern%' - Pattern matching (% = any chars, _ = single char)
- IN (value1, value2, ...) - Multi-value matching
- AND, OR - Logical operators
- EXAMPLES:
- SELECT * FROM user_events WHERE user_id >= 10 AND status != 'deleted';
- SELECT username FROM users WHERE email LIKE '%@company.com';
- SELECT * FROM logs WHERE level IN ('error', 'warning') AND timestamp >= '2023-01-01';
- EXPLAIN SELECT MAX(id) FROM events; -- View execution plan
- Current Status: Full WHERE clause support + Real MQ integration`)
- }
|