protocol.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893
  1. package postgres
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "strconv"
  8. "strings"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/query/engine"
  12. "github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
  13. "github.com/seaweedfs/seaweedfs/weed/util/sqlutil"
  14. "github.com/seaweedfs/seaweedfs/weed/util/version"
  15. )
  16. // mapErrorToPostgreSQLCode maps SeaweedFS SQL engine errors to appropriate PostgreSQL error codes
  17. func mapErrorToPostgreSQLCode(err error) string {
  18. if err == nil {
  19. return "00000" // Success
  20. }
  21. // Use typed errors for robust error mapping
  22. switch err.(type) {
  23. case engine.ParseError:
  24. return "42601" // Syntax error
  25. case engine.TableNotFoundError:
  26. return "42P01" // Undefined table
  27. case engine.ColumnNotFoundError:
  28. return "42703" // Undefined column
  29. case engine.UnsupportedFeatureError:
  30. return "0A000" // Feature not supported
  31. case engine.AggregationError:
  32. // Aggregation errors are usually function-related issues
  33. return "42883" // Undefined function (aggregation function issues)
  34. case engine.DataSourceError:
  35. // Data source errors are usually access or connection issues
  36. return "08000" // Connection exception
  37. case engine.OptimizationError:
  38. // Optimization failures are usually feature limitations
  39. return "0A000" // Feature not supported
  40. case engine.NoSchemaError:
  41. // Topic exists but no schema available
  42. return "42P01" // Undefined table (treat as table not found)
  43. }
  44. // Fallback: analyze error message for backward compatibility with non-typed errors
  45. errLower := strings.ToLower(err.Error())
  46. // Parsing and syntax errors
  47. if strings.Contains(errLower, "parse error") || strings.Contains(errLower, "syntax") {
  48. return "42601" // Syntax error
  49. }
  50. // Unsupported features
  51. if strings.Contains(errLower, "unsupported") || strings.Contains(errLower, "not supported") {
  52. return "0A000" // Feature not supported
  53. }
  54. // Table/topic not found
  55. if strings.Contains(errLower, "not found") ||
  56. (strings.Contains(errLower, "topic") && strings.Contains(errLower, "available")) {
  57. return "42P01" // Undefined table
  58. }
  59. // Column-related errors
  60. if strings.Contains(errLower, "column") || strings.Contains(errLower, "field") {
  61. return "42703" // Undefined column
  62. }
  63. // Multi-table or complex query limitations
  64. if strings.Contains(errLower, "single table") || strings.Contains(errLower, "join") {
  65. return "0A000" // Feature not supported
  66. }
  67. // Default to generic syntax/access error
  68. return "42000" // Syntax error or access rule violation
  69. }
  70. // handleMessage processes a single PostgreSQL protocol message
  71. func (s *PostgreSQLServer) handleMessage(session *PostgreSQLSession) error {
  72. // Read message type
  73. msgType := make([]byte, 1)
  74. _, err := io.ReadFull(session.reader, msgType)
  75. if err != nil {
  76. return err
  77. }
  78. // Read message length
  79. length := make([]byte, 4)
  80. _, err = io.ReadFull(session.reader, length)
  81. if err != nil {
  82. return err
  83. }
  84. msgLength := binary.BigEndian.Uint32(length) - 4
  85. msgBody := make([]byte, msgLength)
  86. if msgLength > 0 {
  87. _, err = io.ReadFull(session.reader, msgBody)
  88. if err != nil {
  89. return err
  90. }
  91. }
  92. // Process message based on type
  93. switch msgType[0] {
  94. case PG_MSG_QUERY:
  95. return s.handleSimpleQuery(session, string(msgBody[:len(msgBody)-1])) // Remove null terminator
  96. case PG_MSG_PARSE:
  97. return s.handleParse(session, msgBody)
  98. case PG_MSG_BIND:
  99. return s.handleBind(session, msgBody)
  100. case PG_MSG_EXECUTE:
  101. return s.handleExecute(session, msgBody)
  102. case PG_MSG_DESCRIBE:
  103. return s.handleDescribe(session, msgBody)
  104. case PG_MSG_CLOSE:
  105. return s.handleClose(session, msgBody)
  106. case PG_MSG_FLUSH:
  107. return s.handleFlush(session)
  108. case PG_MSG_SYNC:
  109. return s.handleSync(session)
  110. case PG_MSG_TERMINATE:
  111. return io.EOF // Signal connection termination
  112. default:
  113. return s.sendError(session, "08P01", fmt.Sprintf("unknown message type: %c", msgType[0]))
  114. }
  115. }
  116. // handleSimpleQuery processes a simple query message
  117. func (s *PostgreSQLServer) handleSimpleQuery(session *PostgreSQLSession, query string) error {
  118. glog.V(2).Infof("PostgreSQL Query (ID: %d): %s", session.processID, query)
  119. // Add comprehensive error recovery to prevent crashes
  120. defer func() {
  121. if r := recover(); r != nil {
  122. glog.Errorf("Panic in handleSimpleQuery (ID: %d): %v", session.processID, r)
  123. // Try to send error message
  124. s.sendError(session, "XX000", fmt.Sprintf("Internal error: %v", r))
  125. // Try to send ReadyForQuery to keep connection alive
  126. s.sendReadyForQuery(session)
  127. }
  128. }()
  129. // Handle USE database commands for session context
  130. parts := strings.Fields(strings.TrimSpace(query))
  131. if len(parts) >= 2 && strings.ToUpper(parts[0]) == "USE" {
  132. // Re-join the parts after "USE" to handle names with spaces, then trim.
  133. dbName := strings.TrimSpace(strings.TrimPrefix(strings.TrimSpace(query), parts[0]))
  134. // Unquote if necessary (handle quoted identifiers like "my-database")
  135. if len(dbName) > 1 && dbName[0] == '"' && dbName[len(dbName)-1] == '"' {
  136. dbName = dbName[1 : len(dbName)-1]
  137. } else if len(dbName) > 1 && dbName[0] == '`' && dbName[len(dbName)-1] == '`' {
  138. // Also handle backtick quotes for MySQL/other client compatibility
  139. dbName = dbName[1 : len(dbName)-1]
  140. }
  141. session.database = dbName
  142. s.sqlEngine.GetCatalog().SetCurrentDatabase(dbName)
  143. // Send command complete for USE
  144. err := s.sendCommandComplete(session, "USE")
  145. if err != nil {
  146. return err
  147. }
  148. // Send ReadyForQuery and exit (don't continue processing)
  149. return s.sendReadyForQuery(session)
  150. }
  151. // Set database context in SQL engine if session database is different from current
  152. if session.database != "" && session.database != s.sqlEngine.GetCatalog().GetCurrentDatabase() {
  153. s.sqlEngine.GetCatalog().SetCurrentDatabase(session.database)
  154. }
  155. // Split query string into individual statements to handle multi-statement queries
  156. queries := sqlutil.SplitStatements(query)
  157. // Execute each statement sequentially
  158. for _, singleQuery := range queries {
  159. cleanQuery := strings.TrimSpace(singleQuery)
  160. if cleanQuery == "" {
  161. continue // Skip empty statements
  162. }
  163. // Handle PostgreSQL-specific system queries directly
  164. if systemResult := s.handleSystemQuery(session, cleanQuery); systemResult != nil {
  165. err := s.sendSystemQueryResult(session, systemResult, cleanQuery)
  166. if err != nil {
  167. return err
  168. }
  169. continue // Continue with next statement
  170. }
  171. // Execute using PostgreSQL-compatible SQL engine for proper dialect support
  172. ctx := context.Background()
  173. var result *engine.QueryResult
  174. var err error
  175. // Execute SQL query with panic recovery to prevent crashes
  176. func() {
  177. defer func() {
  178. if r := recover(); r != nil {
  179. glog.Errorf("Panic in SQL execution (ID: %d, Query: %s): %v", session.processID, cleanQuery, r)
  180. err = fmt.Errorf("internal error during SQL execution: %v", r)
  181. }
  182. }()
  183. // Use the main sqlEngine (now uses CockroachDB parser for PostgreSQL compatibility)
  184. result, err = s.sqlEngine.ExecuteSQL(ctx, cleanQuery)
  185. }()
  186. if err != nil {
  187. // Send error message but keep connection alive
  188. errorCode := mapErrorToPostgreSQLCode(err)
  189. sendErr := s.sendError(session, errorCode, err.Error())
  190. if sendErr != nil {
  191. return sendErr
  192. }
  193. // Send ReadyForQuery to keep connection alive
  194. return s.sendReadyForQuery(session)
  195. }
  196. if result.Error != nil {
  197. // Send error message but keep connection alive
  198. errorCode := mapErrorToPostgreSQLCode(result.Error)
  199. sendErr := s.sendError(session, errorCode, result.Error.Error())
  200. if sendErr != nil {
  201. return sendErr
  202. }
  203. // Send ReadyForQuery to keep connection alive
  204. return s.sendReadyForQuery(session)
  205. }
  206. // Send results for this statement
  207. if len(result.Columns) > 0 {
  208. // Send row description
  209. err = s.sendRowDescription(session, result)
  210. if err != nil {
  211. return err
  212. }
  213. // Send data rows
  214. for _, row := range result.Rows {
  215. err = s.sendDataRow(session, row)
  216. if err != nil {
  217. return err
  218. }
  219. }
  220. }
  221. // Send command complete for this statement
  222. tag := s.getCommandTag(cleanQuery, len(result.Rows))
  223. err = s.sendCommandComplete(session, tag)
  224. if err != nil {
  225. return err
  226. }
  227. }
  228. // Send ready for query after all statements are processed
  229. return s.sendReadyForQuery(session)
  230. }
  231. // SystemQueryResult represents the result of a system query
  232. type SystemQueryResult struct {
  233. Columns []string
  234. Rows [][]string
  235. }
  236. // handleSystemQuery handles PostgreSQL system queries directly
  237. func (s *PostgreSQLServer) handleSystemQuery(session *PostgreSQLSession, query string) *SystemQueryResult {
  238. // Trim and normalize query
  239. query = strings.TrimSpace(query)
  240. query = strings.TrimSuffix(query, ";")
  241. queryLower := strings.ToLower(query)
  242. // Handle essential PostgreSQL system queries
  243. switch queryLower {
  244. case "select version()":
  245. return &SystemQueryResult{
  246. Columns: []string{"version"},
  247. Rows: [][]string{{fmt.Sprintf("SeaweedFS %s (PostgreSQL 14.0 compatible)", version.VERSION_NUMBER)}},
  248. }
  249. case "select current_database()":
  250. return &SystemQueryResult{
  251. Columns: []string{"current_database"},
  252. Rows: [][]string{{s.config.Database}},
  253. }
  254. case "select current_user":
  255. return &SystemQueryResult{
  256. Columns: []string{"current_user"},
  257. Rows: [][]string{{"seaweedfs"}},
  258. }
  259. case "select current_setting('server_version')":
  260. return &SystemQueryResult{
  261. Columns: []string{"server_version"},
  262. Rows: [][]string{{fmt.Sprintf("%s (SeaweedFS)", version.VERSION_NUMBER)}},
  263. }
  264. case "select current_setting('server_encoding')":
  265. return &SystemQueryResult{
  266. Columns: []string{"server_encoding"},
  267. Rows: [][]string{{"UTF8"}},
  268. }
  269. case "select current_setting('client_encoding')":
  270. return &SystemQueryResult{
  271. Columns: []string{"client_encoding"},
  272. Rows: [][]string{{"UTF8"}},
  273. }
  274. }
  275. // Handle transaction commands (no-op for read-only)
  276. switch queryLower {
  277. case "begin", "start transaction":
  278. return &SystemQueryResult{
  279. Columns: []string{"status"},
  280. Rows: [][]string{{"BEGIN"}},
  281. }
  282. case "commit":
  283. return &SystemQueryResult{
  284. Columns: []string{"status"},
  285. Rows: [][]string{{"COMMIT"}},
  286. }
  287. case "rollback":
  288. return &SystemQueryResult{
  289. Columns: []string{"status"},
  290. Rows: [][]string{{"ROLLBACK"}},
  291. }
  292. }
  293. // If starts with SET, return a no-op
  294. if strings.HasPrefix(queryLower, "set ") {
  295. return &SystemQueryResult{
  296. Columns: []string{"status"},
  297. Rows: [][]string{{"SET"}},
  298. }
  299. }
  300. // Return nil to use SQL engine
  301. return nil
  302. }
  303. // sendSystemQueryResult sends the result of a system query
  304. func (s *PostgreSQLServer) sendSystemQueryResult(session *PostgreSQLSession, result *SystemQueryResult, query string) error {
  305. // Add panic recovery to prevent crashes in system query results
  306. defer func() {
  307. if r := recover(); r != nil {
  308. glog.Errorf("Panic in sendSystemQueryResult (ID: %d, Query: %s): %v", session.processID, query, r)
  309. // Try to send error and continue
  310. s.sendError(session, "XX000", fmt.Sprintf("Internal error in system query: %v", r))
  311. }
  312. }()
  313. // Create column descriptions for system query results
  314. columns := make([]string, len(result.Columns))
  315. for i, col := range result.Columns {
  316. columns[i] = col
  317. }
  318. // Convert to sqltypes.Value format
  319. var sqlRows [][]sqltypes.Value
  320. for _, row := range result.Rows {
  321. sqlRow := make([]sqltypes.Value, len(row))
  322. for i, cell := range row {
  323. sqlRow[i] = sqltypes.NewVarChar(cell)
  324. }
  325. sqlRows = append(sqlRows, sqlRow)
  326. }
  327. // Send row description (create a temporary QueryResult for consistency)
  328. tempResult := &engine.QueryResult{
  329. Columns: columns,
  330. Rows: sqlRows,
  331. }
  332. err := s.sendRowDescription(session, tempResult)
  333. if err != nil {
  334. return err
  335. }
  336. // Send data rows
  337. for _, row := range sqlRows {
  338. err = s.sendDataRow(session, row)
  339. if err != nil {
  340. return err
  341. }
  342. }
  343. // Send command complete
  344. tag := s.getCommandTag(query, len(result.Rows))
  345. err = s.sendCommandComplete(session, tag)
  346. if err != nil {
  347. return err
  348. }
  349. // Send ready for query
  350. return s.sendReadyForQuery(session)
  351. }
  352. // handleParse processes a Parse message (prepared statement)
  353. func (s *PostgreSQLServer) handleParse(session *PostgreSQLSession, msgBody []byte) error {
  354. // Parse message format: statement_name\0query\0param_count(int16)[param_type(int32)...]
  355. parts := strings.Split(string(msgBody), "\x00")
  356. if len(parts) < 2 {
  357. return s.sendError(session, "08P01", "invalid Parse message format")
  358. }
  359. stmtName := parts[0]
  360. query := parts[1]
  361. // Create prepared statement
  362. stmt := &PreparedStatement{
  363. Name: stmtName,
  364. Query: query,
  365. ParamTypes: []uint32{},
  366. Fields: []FieldDescription{},
  367. }
  368. session.preparedStmts[stmtName] = stmt
  369. // Send parse complete
  370. return s.sendParseComplete(session)
  371. }
  372. // handleBind processes a Bind message
  373. func (s *PostgreSQLServer) handleBind(session *PostgreSQLSession, msgBody []byte) error {
  374. // For now, simple implementation
  375. // In full implementation, would parse parameters and create portal
  376. // Send bind complete
  377. return s.sendBindComplete(session)
  378. }
  379. // handleExecute processes an Execute message
  380. func (s *PostgreSQLServer) handleExecute(session *PostgreSQLSession, msgBody []byte) error {
  381. // Parse portal name
  382. parts := strings.Split(string(msgBody), "\x00")
  383. if len(parts) == 0 {
  384. return s.sendError(session, "08P01", "invalid Execute message format")
  385. }
  386. portalName := parts[0]
  387. // For now, execute as simple query
  388. // In full implementation, would use portal with parameters
  389. glog.V(2).Infof("PostgreSQL Execute portal (ID: %d): %s", session.processID, portalName)
  390. // Send command complete
  391. err := s.sendCommandComplete(session, "SELECT 0")
  392. if err != nil {
  393. return err
  394. }
  395. return nil
  396. }
  397. // handleDescribe processes a Describe message
  398. func (s *PostgreSQLServer) handleDescribe(session *PostgreSQLSession, msgBody []byte) error {
  399. if len(msgBody) < 2 {
  400. return s.sendError(session, "08P01", "invalid Describe message format")
  401. }
  402. objectType := msgBody[0] // 'S' for statement, 'P' for portal
  403. objectName := string(msgBody[1:])
  404. glog.V(2).Infof("PostgreSQL Describe %c (ID: %d): %s", objectType, session.processID, objectName)
  405. // For now, send empty row description
  406. tempResult := &engine.QueryResult{
  407. Columns: []string{},
  408. Rows: [][]sqltypes.Value{},
  409. }
  410. return s.sendRowDescription(session, tempResult)
  411. }
  412. // handleClose processes a Close message
  413. func (s *PostgreSQLServer) handleClose(session *PostgreSQLSession, msgBody []byte) error {
  414. if len(msgBody) < 2 {
  415. return s.sendError(session, "08P01", "invalid Close message format")
  416. }
  417. objectType := msgBody[0] // 'S' for statement, 'P' for portal
  418. objectName := string(msgBody[1:])
  419. switch objectType {
  420. case 'S':
  421. delete(session.preparedStmts, objectName)
  422. case 'P':
  423. delete(session.portals, objectName)
  424. }
  425. // Send close complete
  426. return s.sendCloseComplete(session)
  427. }
  428. // handleFlush processes a Flush message
  429. func (s *PostgreSQLServer) handleFlush(session *PostgreSQLSession) error {
  430. return session.writer.Flush()
  431. }
  432. // handleSync processes a Sync message
  433. func (s *PostgreSQLServer) handleSync(session *PostgreSQLSession) error {
  434. // Reset transaction state if needed
  435. session.transactionState = PG_TRANS_IDLE
  436. // Send ready for query
  437. return s.sendReadyForQuery(session)
  438. }
  439. // sendParameterStatus sends a parameter status message
  440. func (s *PostgreSQLServer) sendParameterStatus(session *PostgreSQLSession, name, value string) error {
  441. msg := make([]byte, 0)
  442. msg = append(msg, PG_RESP_PARAMETER)
  443. // Calculate length
  444. length := 4 + len(name) + 1 + len(value) + 1
  445. lengthBytes := make([]byte, 4)
  446. binary.BigEndian.PutUint32(lengthBytes, uint32(length))
  447. msg = append(msg, lengthBytes...)
  448. // Add name and value
  449. msg = append(msg, []byte(name)...)
  450. msg = append(msg, 0) // null terminator
  451. msg = append(msg, []byte(value)...)
  452. msg = append(msg, 0) // null terminator
  453. _, err := session.writer.Write(msg)
  454. if err == nil {
  455. err = session.writer.Flush()
  456. }
  457. return err
  458. }
  459. // sendBackendKeyData sends backend key data
  460. func (s *PostgreSQLServer) sendBackendKeyData(session *PostgreSQLSession) error {
  461. msg := make([]byte, 13)
  462. msg[0] = PG_RESP_BACKEND_KEY
  463. binary.BigEndian.PutUint32(msg[1:5], 12)
  464. binary.BigEndian.PutUint32(msg[5:9], session.processID)
  465. binary.BigEndian.PutUint32(msg[9:13], session.secretKey)
  466. _, err := session.writer.Write(msg)
  467. if err == nil {
  468. err = session.writer.Flush()
  469. }
  470. return err
  471. }
  472. // sendReadyForQuery sends ready for query message
  473. func (s *PostgreSQLServer) sendReadyForQuery(session *PostgreSQLSession) error {
  474. msg := make([]byte, 6)
  475. msg[0] = PG_RESP_READY
  476. binary.BigEndian.PutUint32(msg[1:5], 5)
  477. msg[5] = session.transactionState
  478. _, err := session.writer.Write(msg)
  479. if err == nil {
  480. err = session.writer.Flush()
  481. }
  482. return err
  483. }
  484. // sendRowDescription sends row description message
  485. func (s *PostgreSQLServer) sendRowDescription(session *PostgreSQLSession, result *engine.QueryResult) error {
  486. msg := make([]byte, 0)
  487. msg = append(msg, PG_RESP_ROW_DESC)
  488. // Calculate message length
  489. length := 4 + 2 // length + field count
  490. for _, col := range result.Columns {
  491. length += len(col) + 1 + 4 + 2 + 4 + 2 + 4 + 2 // name + null + tableOID + attrNum + typeOID + typeSize + typeMod + format
  492. }
  493. lengthBytes := make([]byte, 4)
  494. binary.BigEndian.PutUint32(lengthBytes, uint32(length))
  495. msg = append(msg, lengthBytes...)
  496. // Field count
  497. fieldCountBytes := make([]byte, 2)
  498. binary.BigEndian.PutUint16(fieldCountBytes, uint16(len(result.Columns)))
  499. msg = append(msg, fieldCountBytes...)
  500. // Field descriptions
  501. for i, col := range result.Columns {
  502. // Field name
  503. msg = append(msg, []byte(col)...)
  504. msg = append(msg, 0) // null terminator
  505. // Table OID (0 for no table)
  506. tableOID := make([]byte, 4)
  507. binary.BigEndian.PutUint32(tableOID, 0)
  508. msg = append(msg, tableOID...)
  509. // Attribute number
  510. attrNum := make([]byte, 2)
  511. binary.BigEndian.PutUint16(attrNum, uint16(i+1))
  512. msg = append(msg, attrNum...)
  513. // Type OID (determine from schema if available, fallback to data inference)
  514. typeOID := s.getPostgreSQLTypeFromSchema(result, col, i)
  515. typeOIDBytes := make([]byte, 4)
  516. binary.BigEndian.PutUint32(typeOIDBytes, typeOID)
  517. msg = append(msg, typeOIDBytes...)
  518. // Type size (-1 for variable length)
  519. typeSize := make([]byte, 2)
  520. binary.BigEndian.PutUint16(typeSize, 0xFFFF) // -1 as uint16
  521. msg = append(msg, typeSize...)
  522. // Type modifier (-1 for default)
  523. typeMod := make([]byte, 4)
  524. binary.BigEndian.PutUint32(typeMod, 0xFFFFFFFF) // -1 as uint32
  525. msg = append(msg, typeMod...)
  526. // Format (0 for text)
  527. format := make([]byte, 2)
  528. binary.BigEndian.PutUint16(format, 0)
  529. msg = append(msg, format...)
  530. }
  531. _, err := session.writer.Write(msg)
  532. if err == nil {
  533. err = session.writer.Flush()
  534. }
  535. return err
  536. }
  537. // sendDataRow sends a data row message
  538. func (s *PostgreSQLServer) sendDataRow(session *PostgreSQLSession, row []sqltypes.Value) error {
  539. msg := make([]byte, 0)
  540. msg = append(msg, PG_RESP_DATA_ROW)
  541. // Calculate message length
  542. length := 4 + 2 // length + field count
  543. for _, value := range row {
  544. if value.IsNull() {
  545. length += 4 // null value length (-1)
  546. } else {
  547. valueStr := value.ToString()
  548. length += 4 + len(valueStr) // field length + data
  549. }
  550. }
  551. lengthBytes := make([]byte, 4)
  552. binary.BigEndian.PutUint32(lengthBytes, uint32(length))
  553. msg = append(msg, lengthBytes...)
  554. // Field count
  555. fieldCountBytes := make([]byte, 2)
  556. binary.BigEndian.PutUint16(fieldCountBytes, uint16(len(row)))
  557. msg = append(msg, fieldCountBytes...)
  558. // Field values
  559. for _, value := range row {
  560. if value.IsNull() {
  561. // Null value
  562. nullLength := make([]byte, 4)
  563. binary.BigEndian.PutUint32(nullLength, 0xFFFFFFFF) // -1 as uint32
  564. msg = append(msg, nullLength...)
  565. } else {
  566. valueStr := value.ToString()
  567. valueLength := make([]byte, 4)
  568. binary.BigEndian.PutUint32(valueLength, uint32(len(valueStr)))
  569. msg = append(msg, valueLength...)
  570. msg = append(msg, []byte(valueStr)...)
  571. }
  572. }
  573. _, err := session.writer.Write(msg)
  574. if err == nil {
  575. err = session.writer.Flush()
  576. }
  577. return err
  578. }
  579. // sendCommandComplete sends command complete message
  580. func (s *PostgreSQLServer) sendCommandComplete(session *PostgreSQLSession, tag string) error {
  581. msg := make([]byte, 0)
  582. msg = append(msg, PG_RESP_COMMAND)
  583. length := 4 + len(tag) + 1
  584. lengthBytes := make([]byte, 4)
  585. binary.BigEndian.PutUint32(lengthBytes, uint32(length))
  586. msg = append(msg, lengthBytes...)
  587. msg = append(msg, []byte(tag)...)
  588. msg = append(msg, 0) // null terminator
  589. _, err := session.writer.Write(msg)
  590. if err == nil {
  591. err = session.writer.Flush()
  592. }
  593. return err
  594. }
  595. // sendParseComplete sends parse complete message
  596. func (s *PostgreSQLServer) sendParseComplete(session *PostgreSQLSession) error {
  597. msg := make([]byte, 5)
  598. msg[0] = PG_RESP_PARSE_COMPLETE
  599. binary.BigEndian.PutUint32(msg[1:5], 4)
  600. _, err := session.writer.Write(msg)
  601. if err == nil {
  602. err = session.writer.Flush()
  603. }
  604. return err
  605. }
  606. // sendBindComplete sends bind complete message
  607. func (s *PostgreSQLServer) sendBindComplete(session *PostgreSQLSession) error {
  608. msg := make([]byte, 5)
  609. msg[0] = PG_RESP_BIND_COMPLETE
  610. binary.BigEndian.PutUint32(msg[1:5], 4)
  611. _, err := session.writer.Write(msg)
  612. if err == nil {
  613. err = session.writer.Flush()
  614. }
  615. return err
  616. }
  617. // sendCloseComplete sends close complete message
  618. func (s *PostgreSQLServer) sendCloseComplete(session *PostgreSQLSession) error {
  619. msg := make([]byte, 5)
  620. msg[0] = PG_RESP_CLOSE_COMPLETE
  621. binary.BigEndian.PutUint32(msg[1:5], 4)
  622. _, err := session.writer.Write(msg)
  623. if err == nil {
  624. err = session.writer.Flush()
  625. }
  626. return err
  627. }
  628. // sendError sends an error message
  629. func (s *PostgreSQLServer) sendError(session *PostgreSQLSession, code, message string) error {
  630. msg := make([]byte, 0)
  631. msg = append(msg, PG_RESP_ERROR)
  632. // Build error fields
  633. fields := fmt.Sprintf("S%s\x00C%s\x00M%s\x00\x00", "ERROR", code, message)
  634. length := 4 + len(fields)
  635. lengthBytes := make([]byte, 4)
  636. binary.BigEndian.PutUint32(lengthBytes, uint32(length))
  637. msg = append(msg, lengthBytes...)
  638. msg = append(msg, []byte(fields)...)
  639. _, err := session.writer.Write(msg)
  640. if err == nil {
  641. err = session.writer.Flush()
  642. }
  643. return err
  644. }
  645. // getCommandTag generates appropriate command tag for query
  646. func (s *PostgreSQLServer) getCommandTag(query string, rowCount int) string {
  647. queryUpper := strings.ToUpper(strings.TrimSpace(query))
  648. if strings.HasPrefix(queryUpper, "SELECT") {
  649. return fmt.Sprintf("SELECT %d", rowCount)
  650. } else if strings.HasPrefix(queryUpper, "INSERT") {
  651. return fmt.Sprintf("INSERT 0 %d", rowCount)
  652. } else if strings.HasPrefix(queryUpper, "UPDATE") {
  653. return fmt.Sprintf("UPDATE %d", rowCount)
  654. } else if strings.HasPrefix(queryUpper, "DELETE") {
  655. return fmt.Sprintf("DELETE %d", rowCount)
  656. } else if strings.HasPrefix(queryUpper, "SHOW") {
  657. return fmt.Sprintf("SELECT %d", rowCount)
  658. } else if strings.HasPrefix(queryUpper, "DESCRIBE") || strings.HasPrefix(queryUpper, "DESC") {
  659. return fmt.Sprintf("SELECT %d", rowCount)
  660. }
  661. return "SELECT 0"
  662. }
  663. // getPostgreSQLTypeFromSchema determines PostgreSQL type OID from schema information first, fallback to data
  664. func (s *PostgreSQLServer) getPostgreSQLTypeFromSchema(result *engine.QueryResult, columnName string, colIndex int) uint32 {
  665. // Try to get type from schema if database and table are available
  666. if result.Database != "" && result.Table != "" {
  667. if tableInfo, err := s.sqlEngine.GetCatalog().GetTableInfo(result.Database, result.Table); err == nil {
  668. if tableInfo.Schema != nil && tableInfo.Schema.RecordType != nil {
  669. // Look for the field in the schema
  670. for _, field := range tableInfo.Schema.RecordType.Fields {
  671. if field.Name == columnName {
  672. return s.mapSchemaTypeToPostgreSQL(field.Type)
  673. }
  674. }
  675. }
  676. }
  677. }
  678. // Handle system columns
  679. switch columnName {
  680. case "_timestamp_ns":
  681. return PG_TYPE_INT8 // PostgreSQL BIGINT for nanosecond timestamps
  682. case "_key":
  683. return PG_TYPE_BYTEA // PostgreSQL BYTEA for binary keys
  684. case "_source":
  685. return PG_TYPE_TEXT // PostgreSQL TEXT for source information
  686. }
  687. // Fallback to data-based inference if schema is not available
  688. return s.getPostgreSQLTypeFromData(result.Columns, result.Rows, colIndex)
  689. }
  690. // mapSchemaTypeToPostgreSQL maps SeaweedFS schema types to PostgreSQL type OIDs
  691. func (s *PostgreSQLServer) mapSchemaTypeToPostgreSQL(fieldType *schema_pb.Type) uint32 {
  692. if fieldType == nil {
  693. return PG_TYPE_TEXT
  694. }
  695. switch kind := fieldType.Kind.(type) {
  696. case *schema_pb.Type_ScalarType:
  697. switch kind.ScalarType {
  698. case schema_pb.ScalarType_BOOL:
  699. return PG_TYPE_BOOL
  700. case schema_pb.ScalarType_INT32:
  701. return PG_TYPE_INT4
  702. case schema_pb.ScalarType_INT64:
  703. return PG_TYPE_INT8
  704. case schema_pb.ScalarType_FLOAT:
  705. return PG_TYPE_FLOAT4
  706. case schema_pb.ScalarType_DOUBLE:
  707. return PG_TYPE_FLOAT8
  708. case schema_pb.ScalarType_BYTES:
  709. return PG_TYPE_BYTEA
  710. case schema_pb.ScalarType_STRING:
  711. return PG_TYPE_TEXT
  712. default:
  713. return PG_TYPE_TEXT
  714. }
  715. case *schema_pb.Type_ListType:
  716. // For list types, we'll represent them as JSON text
  717. return PG_TYPE_JSONB
  718. case *schema_pb.Type_RecordType:
  719. // For nested record types, we'll represent them as JSON text
  720. return PG_TYPE_JSONB
  721. default:
  722. return PG_TYPE_TEXT
  723. }
  724. }
  725. // getPostgreSQLTypeFromData determines PostgreSQL type OID from data (legacy fallback method)
  726. func (s *PostgreSQLServer) getPostgreSQLTypeFromData(columns []string, rows [][]sqltypes.Value, colIndex int) uint32 {
  727. if len(rows) == 0 || colIndex >= len(rows[0]) {
  728. return PG_TYPE_TEXT // Default to text
  729. }
  730. // Sample first non-null value to determine type
  731. for _, row := range rows {
  732. if colIndex < len(row) && !row[colIndex].IsNull() {
  733. value := row[colIndex]
  734. switch value.Type() {
  735. case sqltypes.Int8, sqltypes.Int16, sqltypes.Int32:
  736. return PG_TYPE_INT4
  737. case sqltypes.Int64:
  738. return PG_TYPE_INT8
  739. case sqltypes.Float32, sqltypes.Float64:
  740. return PG_TYPE_FLOAT8
  741. case sqltypes.Bit:
  742. return PG_TYPE_BOOL
  743. case sqltypes.Timestamp, sqltypes.Datetime:
  744. return PG_TYPE_TIMESTAMP
  745. default:
  746. // Try to infer from string content
  747. valueStr := value.ToString()
  748. if _, err := strconv.ParseInt(valueStr, 10, 32); err == nil {
  749. return PG_TYPE_INT4
  750. }
  751. if _, err := strconv.ParseInt(valueStr, 10, 64); err == nil {
  752. return PG_TYPE_INT8
  753. }
  754. if _, err := strconv.ParseFloat(valueStr, 64); err == nil {
  755. return PG_TYPE_FLOAT8
  756. }
  757. if valueStr == "true" || valueStr == "false" {
  758. return PG_TYPE_BOOL
  759. }
  760. return PG_TYPE_TEXT
  761. }
  762. }
  763. }
  764. return PG_TYPE_TEXT // Default to text
  765. }