system_columns.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package engine
  2. import (
  3. "strings"
  4. "time"
  5. "github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
  6. )
  7. // System column constants used throughout the SQL engine
  8. const (
  9. SW_COLUMN_NAME_TIMESTAMP = "_timestamp_ns" // Message timestamp in nanoseconds (internal)
  10. SW_COLUMN_NAME_KEY = "_key" // Message key
  11. SW_COLUMN_NAME_SOURCE = "_source" // Data source (live_log, parquet_archive, etc.)
  12. )
  13. // System column display names (what users see)
  14. const (
  15. SW_DISPLAY_NAME_TIMESTAMP = "_ts" // User-facing timestamp column name
  16. // Note: _key and _source keep the same names, only _timestamp_ns changes to _ts
  17. )
  18. // isSystemColumn checks if a column is a system column (_timestamp_ns, _key, _source)
  19. func (e *SQLEngine) isSystemColumn(columnName string) bool {
  20. lowerName := strings.ToLower(columnName)
  21. return lowerName == SW_COLUMN_NAME_TIMESTAMP ||
  22. lowerName == SW_COLUMN_NAME_KEY ||
  23. lowerName == SW_COLUMN_NAME_SOURCE
  24. }
  25. // isRegularColumn checks if a column might be a regular data column (placeholder)
  26. func (e *SQLEngine) isRegularColumn(columnName string) bool {
  27. // For now, assume any non-system column is a regular column
  28. return !e.isSystemColumn(columnName)
  29. }
  30. // getSystemColumnDisplayName returns the user-facing display name for system columns
  31. func (e *SQLEngine) getSystemColumnDisplayName(columnName string) string {
  32. lowerName := strings.ToLower(columnName)
  33. switch lowerName {
  34. case SW_COLUMN_NAME_TIMESTAMP:
  35. return SW_DISPLAY_NAME_TIMESTAMP
  36. case SW_COLUMN_NAME_KEY:
  37. return SW_COLUMN_NAME_KEY // _key stays the same
  38. case SW_COLUMN_NAME_SOURCE:
  39. return SW_COLUMN_NAME_SOURCE // _source stays the same
  40. default:
  41. return columnName // Return original name for non-system columns
  42. }
  43. }
  44. // isSystemColumnDisplayName checks if a column name is a system column display name
  45. func (e *SQLEngine) isSystemColumnDisplayName(columnName string) bool {
  46. lowerName := strings.ToLower(columnName)
  47. return lowerName == SW_DISPLAY_NAME_TIMESTAMP ||
  48. lowerName == SW_COLUMN_NAME_KEY ||
  49. lowerName == SW_COLUMN_NAME_SOURCE
  50. }
  51. // getSystemColumnInternalName returns the internal name for a system column display name
  52. func (e *SQLEngine) getSystemColumnInternalName(displayName string) string {
  53. lowerName := strings.ToLower(displayName)
  54. switch lowerName {
  55. case SW_DISPLAY_NAME_TIMESTAMP:
  56. return SW_COLUMN_NAME_TIMESTAMP
  57. case SW_COLUMN_NAME_KEY:
  58. return SW_COLUMN_NAME_KEY
  59. case SW_COLUMN_NAME_SOURCE:
  60. return SW_COLUMN_NAME_SOURCE
  61. default:
  62. return displayName // Return original name for non-system columns
  63. }
  64. }
  65. // formatTimestampColumn formats a nanosecond timestamp as a proper timestamp value
  66. func (e *SQLEngine) formatTimestampColumn(timestampNs int64) sqltypes.Value {
  67. // Convert nanoseconds to time.Time
  68. timestamp := time.Unix(timestampNs/1e9, timestampNs%1e9)
  69. // Format as timestamp string in MySQL datetime format
  70. timestampStr := timestamp.UTC().Format("2006-01-02 15:04:05")
  71. // Return as a timestamp value using the Timestamp type
  72. return sqltypes.MakeTrusted(sqltypes.Timestamp, []byte(timestampStr))
  73. }
  74. // getSystemColumnGlobalMin computes global min for system columns using file metadata
  75. func (e *SQLEngine) getSystemColumnGlobalMin(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} {
  76. lowerName := strings.ToLower(columnName)
  77. switch lowerName {
  78. case SW_COLUMN_NAME_TIMESTAMP:
  79. // For timestamps, find the earliest timestamp across all files
  80. // This should match what's in the Extended["min"] metadata
  81. var minTimestamp *int64
  82. for _, fileStats := range allFileStats {
  83. for _, fileStat := range fileStats {
  84. // Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet)
  85. timestamp := e.extractTimestampFromFilename(fileStat.FileName)
  86. if timestamp != 0 {
  87. if minTimestamp == nil || timestamp < *minTimestamp {
  88. minTimestamp = &timestamp
  89. }
  90. }
  91. }
  92. }
  93. if minTimestamp != nil {
  94. return *minTimestamp
  95. }
  96. case SW_COLUMN_NAME_KEY:
  97. // For keys, we'd need to read the actual parquet column stats
  98. // Fall back to scanning if not available in our current stats
  99. return nil
  100. case SW_COLUMN_NAME_SOURCE:
  101. // Source is always "parquet_archive" for parquet files
  102. return "parquet_archive"
  103. }
  104. return nil
  105. }
  106. // getSystemColumnGlobalMax computes global max for system columns using file metadata
  107. func (e *SQLEngine) getSystemColumnGlobalMax(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} {
  108. lowerName := strings.ToLower(columnName)
  109. switch lowerName {
  110. case SW_COLUMN_NAME_TIMESTAMP:
  111. // For timestamps, find the latest timestamp across all files
  112. // This should match what's in the Extended["max"] metadata
  113. var maxTimestamp *int64
  114. for _, fileStats := range allFileStats {
  115. for _, fileStat := range fileStats {
  116. // Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet)
  117. timestamp := e.extractTimestampFromFilename(fileStat.FileName)
  118. if timestamp != 0 {
  119. if maxTimestamp == nil || timestamp > *maxTimestamp {
  120. maxTimestamp = &timestamp
  121. }
  122. }
  123. }
  124. }
  125. if maxTimestamp != nil {
  126. return *maxTimestamp
  127. }
  128. case SW_COLUMN_NAME_KEY:
  129. // For keys, we'd need to read the actual parquet column stats
  130. // Fall back to scanning if not available in our current stats
  131. return nil
  132. case SW_COLUMN_NAME_SOURCE:
  133. // Source is always "parquet_archive" for parquet files
  134. return "parquet_archive"
  135. }
  136. return nil
  137. }