parquet_scanner.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. package engine
  2. import (
  3. "context"
  4. "fmt"
  5. "math/big"
  6. "time"
  7. "github.com/parquet-go/parquet-go"
  8. "github.com/seaweedfs/seaweedfs/weed/filer"
  9. "github.com/seaweedfs/seaweedfs/weed/mq/schema"
  10. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
  15. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  16. )
  17. // ParquetScanner scans MQ topic Parquet files for SELECT queries
  18. // Assumptions:
  19. // 1. All MQ messages are stored in Parquet format in topic partitions
  20. // 2. Each partition directory contains dated Parquet files
  21. // 3. System columns (_timestamp_ns, _key) are added to user schema
  22. // 4. Predicate pushdown is used for efficient scanning
  23. type ParquetScanner struct {
  24. filerClient filer_pb.FilerClient
  25. chunkCache chunk_cache.ChunkCache
  26. topic topic.Topic
  27. recordSchema *schema_pb.RecordType
  28. parquetLevels *schema.ParquetLevels
  29. }
  30. // NewParquetScanner creates a scanner for a specific MQ topic
  31. // Assumption: Topic exists and has Parquet files in partition directories
  32. func NewParquetScanner(filerClient filer_pb.FilerClient, namespace, topicName string) (*ParquetScanner, error) {
  33. // Check if filerClient is available
  34. if filerClient == nil {
  35. return nil, fmt.Errorf("filerClient is required but not available")
  36. }
  37. // Create topic reference
  38. t := topic.Topic{
  39. Namespace: namespace,
  40. Name: topicName,
  41. }
  42. // Read topic configuration to get schema
  43. var topicConf *mq_pb.ConfigureTopicResponse
  44. var err error
  45. if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  46. topicConf, err = t.ReadConfFile(client)
  47. return err
  48. }); err != nil {
  49. return nil, fmt.Errorf("failed to read topic config: %v", err)
  50. }
  51. // Build complete schema with system columns
  52. recordType := topicConf.GetRecordType()
  53. if recordType == nil {
  54. return nil, NoSchemaError{Namespace: namespace, Topic: topicName}
  55. }
  56. // Add system columns that MQ adds to all records
  57. recordType = schema.NewRecordTypeBuilder(recordType).
  58. WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
  59. WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
  60. RecordTypeEnd()
  61. // Convert to Parquet levels for efficient reading
  62. parquetLevels, err := schema.ToParquetLevels(recordType)
  63. if err != nil {
  64. return nil, fmt.Errorf("failed to create Parquet levels: %v", err)
  65. }
  66. return &ParquetScanner{
  67. filerClient: filerClient,
  68. chunkCache: chunk_cache.NewChunkCacheInMemory(256), // Same as MQ logstore
  69. topic: t,
  70. recordSchema: recordType,
  71. parquetLevels: parquetLevels,
  72. }, nil
  73. }
  74. // ScanOptions configure how the scanner reads data
  75. type ScanOptions struct {
  76. // Time range filtering (Unix nanoseconds)
  77. StartTimeNs int64
  78. StopTimeNs int64
  79. // Column projection - if empty, select all columns
  80. Columns []string
  81. // Row limit - 0 means no limit
  82. Limit int
  83. // Predicate for WHERE clause filtering
  84. Predicate func(*schema_pb.RecordValue) bool
  85. }
  86. // ScanResult represents a single scanned record
  87. type ScanResult struct {
  88. Values map[string]*schema_pb.Value // Column name -> value
  89. Timestamp int64 // Message timestamp (_ts_ns)
  90. Key []byte // Message key (_key)
  91. }
  92. // Scan reads records from the topic's Parquet files
  93. // Assumptions:
  94. // 1. Scans all partitions of the topic
  95. // 2. Applies time filtering at Parquet level for efficiency
  96. // 3. Applies predicates and projections after reading
  97. func (ps *ParquetScanner) Scan(ctx context.Context, options ScanOptions) ([]ScanResult, error) {
  98. var results []ScanResult
  99. // Get all partitions for this topic
  100. // TODO: Implement proper partition discovery
  101. // For now, assume partition 0 exists
  102. partitions := []topic.Partition{{RangeStart: 0, RangeStop: 1000}}
  103. for _, partition := range partitions {
  104. partitionResults, err := ps.scanPartition(ctx, partition, options)
  105. if err != nil {
  106. return nil, fmt.Errorf("failed to scan partition %v: %v", partition, err)
  107. }
  108. results = append(results, partitionResults...)
  109. // Apply global limit across all partitions
  110. if options.Limit > 0 && len(results) >= options.Limit {
  111. results = results[:options.Limit]
  112. break
  113. }
  114. }
  115. return results, nil
  116. }
  117. // scanPartition scans a specific topic partition
  118. func (ps *ParquetScanner) scanPartition(ctx context.Context, partition topic.Partition, options ScanOptions) ([]ScanResult, error) {
  119. // partitionDir := topic.PartitionDir(ps.topic, partition) // TODO: Use for actual file listing
  120. var results []ScanResult
  121. // List Parquet files in partition directory
  122. // TODO: Implement proper file listing with date range filtering
  123. // For now, this is a placeholder that would list actual Parquet files
  124. // Simulate file processing - in real implementation, this would:
  125. // 1. List files in partitionDir via filerClient
  126. // 2. Filter files by date range if time filtering is enabled
  127. // 3. Process each Parquet file in chronological order
  128. // Placeholder: Create sample data for testing
  129. if len(results) == 0 {
  130. // Generate sample data for demonstration
  131. sampleData := ps.generateSampleData(options)
  132. results = append(results, sampleData...)
  133. }
  134. return results, nil
  135. }
  136. // scanParquetFile scans a single Parquet file (real implementation)
  137. func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.Entry, options ScanOptions) ([]ScanResult, error) {
  138. var results []ScanResult
  139. // Create reader for the Parquet file (same pattern as logstore)
  140. lookupFileIdFn := filer.LookupFn(ps.filerClient)
  141. fileSize := filer.FileSize(entry)
  142. visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
  143. chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
  144. readerCache := filer.NewReaderCache(32, ps.chunkCache, lookupFileIdFn)
  145. readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize))
  146. // Create Parquet reader
  147. parquetReader := parquet.NewReader(readerAt)
  148. defer parquetReader.Close()
  149. rows := make([]parquet.Row, 128) // Read in batches like logstore
  150. for {
  151. rowCount, readErr := parquetReader.ReadRows(rows)
  152. // Process rows even if EOF
  153. for i := 0; i < rowCount; i++ {
  154. // Convert Parquet row to schema value
  155. recordValue, err := schema.ToRecordValue(ps.recordSchema, ps.parquetLevels, rows[i])
  156. if err != nil {
  157. return nil, fmt.Errorf("failed to convert row: %v", err)
  158. }
  159. // Extract system columns
  160. timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
  161. key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
  162. // Apply time filtering
  163. if options.StartTimeNs > 0 && timestamp < options.StartTimeNs {
  164. continue
  165. }
  166. if options.StopTimeNs > 0 && timestamp >= options.StopTimeNs {
  167. break // Assume data is time-ordered
  168. }
  169. // Apply predicate filtering (WHERE clause)
  170. if options.Predicate != nil && !options.Predicate(recordValue) {
  171. continue
  172. }
  173. // Apply column projection
  174. values := make(map[string]*schema_pb.Value)
  175. if len(options.Columns) == 0 {
  176. // Select all columns (excluding system columns from user view)
  177. for name, value := range recordValue.Fields {
  178. if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
  179. values[name] = value
  180. }
  181. }
  182. } else {
  183. // Select specified columns only
  184. for _, columnName := range options.Columns {
  185. if value, exists := recordValue.Fields[columnName]; exists {
  186. values[columnName] = value
  187. }
  188. }
  189. }
  190. results = append(results, ScanResult{
  191. Values: values,
  192. Timestamp: timestamp,
  193. Key: key,
  194. })
  195. // Apply row limit
  196. if options.Limit > 0 && len(results) >= options.Limit {
  197. return results, nil
  198. }
  199. }
  200. if readErr != nil {
  201. break // EOF or error
  202. }
  203. }
  204. return results, nil
  205. }
  206. // generateSampleData creates sample data for testing when no real Parquet files exist
  207. func (ps *ParquetScanner) generateSampleData(options ScanOptions) []ScanResult {
  208. now := time.Now().UnixNano()
  209. sampleData := []ScanResult{
  210. {
  211. Values: map[string]*schema_pb.Value{
  212. "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}},
  213. "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "login"}},
  214. "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1"}`}},
  215. },
  216. Timestamp: now - 3600000000000, // 1 hour ago
  217. Key: []byte("user-1001"),
  218. },
  219. {
  220. Values: map[string]*schema_pb.Value{
  221. "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}},
  222. "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "page_view"}},
  223. "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"page": "/dashboard"}`}},
  224. },
  225. Timestamp: now - 1800000000000, // 30 minutes ago
  226. Key: []byte("user-1002"),
  227. },
  228. {
  229. Values: map[string]*schema_pb.Value{
  230. "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}},
  231. "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "logout"}},
  232. "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"session_duration": 3600}`}},
  233. },
  234. Timestamp: now - 900000000000, // 15 minutes ago
  235. Key: []byte("user-1001"),
  236. },
  237. }
  238. // Apply predicate filtering if specified
  239. if options.Predicate != nil {
  240. var filtered []ScanResult
  241. for _, result := range sampleData {
  242. // Convert to RecordValue for predicate testing
  243. recordValue := &schema_pb.RecordValue{Fields: make(map[string]*schema_pb.Value)}
  244. for k, v := range result.Values {
  245. recordValue.Fields[k] = v
  246. }
  247. recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}}
  248. recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}}
  249. if options.Predicate(recordValue) {
  250. filtered = append(filtered, result)
  251. }
  252. }
  253. sampleData = filtered
  254. }
  255. // Apply limit
  256. if options.Limit > 0 && len(sampleData) > options.Limit {
  257. sampleData = sampleData[:options.Limit]
  258. }
  259. return sampleData
  260. }
  261. // ConvertToSQLResult converts ScanResults to SQL query results
  262. func (ps *ParquetScanner) ConvertToSQLResult(results []ScanResult, columns []string) *QueryResult {
  263. if len(results) == 0 {
  264. return &QueryResult{
  265. Columns: columns,
  266. Rows: [][]sqltypes.Value{},
  267. }
  268. }
  269. // Determine columns if not specified
  270. if len(columns) == 0 {
  271. columnSet := make(map[string]bool)
  272. for _, result := range results {
  273. for columnName := range result.Values {
  274. columnSet[columnName] = true
  275. }
  276. }
  277. columns = make([]string, 0, len(columnSet))
  278. for columnName := range columnSet {
  279. columns = append(columns, columnName)
  280. }
  281. }
  282. // Convert to SQL rows
  283. rows := make([][]sqltypes.Value, len(results))
  284. for i, result := range results {
  285. row := make([]sqltypes.Value, len(columns))
  286. for j, columnName := range columns {
  287. if value, exists := result.Values[columnName]; exists {
  288. row[j] = convertSchemaValueToSQL(value)
  289. } else {
  290. row[j] = sqltypes.NULL
  291. }
  292. }
  293. rows[i] = row
  294. }
  295. return &QueryResult{
  296. Columns: columns,
  297. Rows: rows,
  298. }
  299. }
  300. // convertSchemaValueToSQL converts schema_pb.Value to sqltypes.Value
  301. func convertSchemaValueToSQL(value *schema_pb.Value) sqltypes.Value {
  302. if value == nil {
  303. return sqltypes.NULL
  304. }
  305. switch v := value.Kind.(type) {
  306. case *schema_pb.Value_BoolValue:
  307. if v.BoolValue {
  308. return sqltypes.NewInt32(1)
  309. }
  310. return sqltypes.NewInt32(0)
  311. case *schema_pb.Value_Int32Value:
  312. return sqltypes.NewInt32(v.Int32Value)
  313. case *schema_pb.Value_Int64Value:
  314. return sqltypes.NewInt64(v.Int64Value)
  315. case *schema_pb.Value_FloatValue:
  316. return sqltypes.NewFloat32(v.FloatValue)
  317. case *schema_pb.Value_DoubleValue:
  318. return sqltypes.NewFloat64(v.DoubleValue)
  319. case *schema_pb.Value_BytesValue:
  320. return sqltypes.NewVarBinary(string(v.BytesValue))
  321. case *schema_pb.Value_StringValue:
  322. return sqltypes.NewVarChar(v.StringValue)
  323. // Parquet logical types
  324. case *schema_pb.Value_TimestampValue:
  325. timestampValue := value.GetTimestampValue()
  326. if timestampValue == nil {
  327. return sqltypes.NULL
  328. }
  329. // Convert microseconds to time.Time and format as datetime string
  330. timestamp := time.UnixMicro(timestampValue.TimestampMicros)
  331. return sqltypes.MakeTrusted(sqltypes.Datetime, []byte(timestamp.Format("2006-01-02 15:04:05")))
  332. case *schema_pb.Value_DateValue:
  333. dateValue := value.GetDateValue()
  334. if dateValue == nil {
  335. return sqltypes.NULL
  336. }
  337. // Convert days since epoch to date string
  338. date := time.Unix(int64(dateValue.DaysSinceEpoch)*86400, 0).UTC()
  339. return sqltypes.MakeTrusted(sqltypes.Date, []byte(date.Format("2006-01-02")))
  340. case *schema_pb.Value_DecimalValue:
  341. decimalValue := value.GetDecimalValue()
  342. if decimalValue == nil {
  343. return sqltypes.NULL
  344. }
  345. // Convert decimal bytes to string representation
  346. decimalStr := decimalToStringHelper(decimalValue)
  347. return sqltypes.MakeTrusted(sqltypes.Decimal, []byte(decimalStr))
  348. case *schema_pb.Value_TimeValue:
  349. timeValue := value.GetTimeValue()
  350. if timeValue == nil {
  351. return sqltypes.NULL
  352. }
  353. // Convert microseconds since midnight to time string
  354. duration := time.Duration(timeValue.TimeMicros) * time.Microsecond
  355. timeOfDay := time.Date(0, 1, 1, 0, 0, 0, 0, time.UTC).Add(duration)
  356. return sqltypes.MakeTrusted(sqltypes.Time, []byte(timeOfDay.Format("15:04:05")))
  357. default:
  358. return sqltypes.NewVarChar(fmt.Sprintf("%v", value))
  359. }
  360. }
  361. // decimalToStringHelper converts a DecimalValue to string representation
  362. // This is a standalone version of the engine's decimalToString method
  363. func decimalToStringHelper(decimalValue *schema_pb.DecimalValue) string {
  364. if decimalValue == nil || decimalValue.Value == nil {
  365. return "0"
  366. }
  367. // Convert bytes back to big.Int
  368. intValue := new(big.Int).SetBytes(decimalValue.Value)
  369. // Convert to string with proper decimal placement
  370. str := intValue.String()
  371. // Handle decimal placement based on scale
  372. scale := int(decimalValue.Scale)
  373. if scale > 0 && len(str) > scale {
  374. // Insert decimal point
  375. decimalPos := len(str) - scale
  376. return str[:decimalPos] + "." + str[decimalPos:]
  377. }
  378. return str
  379. }