catalog.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. package engine
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/schema"
  8. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  11. )
  12. // BrokerClientInterface defines the interface for broker client operations
  13. // Both real BrokerClient and MockBrokerClient implement this interface
  14. type BrokerClientInterface interface {
  15. ListNamespaces(ctx context.Context) ([]string, error)
  16. ListTopics(ctx context.Context, namespace string) ([]string, error)
  17. GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error)
  18. GetFilerClient() (filer_pb.FilerClient, error)
  19. ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error
  20. DeleteTopic(ctx context.Context, namespace, topicName string) error
  21. // GetUnflushedMessages returns only messages that haven't been flushed to disk yet
  22. // This prevents double-counting when combining with disk-based data
  23. GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error)
  24. }
  25. // SchemaCatalog manages the mapping between MQ topics and SQL tables
  26. // Assumptions:
  27. // 1. Each MQ namespace corresponds to a SQL database
  28. // 2. Each MQ topic corresponds to a SQL table
  29. // 3. Topic schemas are cached for performance
  30. // 4. Schema evolution is tracked via RevisionId
  31. type SchemaCatalog struct {
  32. mu sync.RWMutex
  33. // databases maps namespace names to database metadata
  34. // Assumption: Namespace names are valid SQL database identifiers
  35. databases map[string]*DatabaseInfo
  36. // currentDatabase tracks the active database context (for USE database)
  37. // Assumption: Single-threaded usage per SQL session
  38. currentDatabase string
  39. // brokerClient handles communication with MQ broker
  40. brokerClient BrokerClientInterface // Use interface for dependency injection
  41. // defaultPartitionCount is the default number of partitions for new topics
  42. // Can be overridden in CREATE TABLE statements with PARTITION COUNT option
  43. defaultPartitionCount int32
  44. // cacheTTL is the time-to-live for cached database and table information
  45. // After this duration, cached data is considered stale and will be refreshed
  46. cacheTTL time.Duration
  47. }
  48. // DatabaseInfo represents a SQL database (MQ namespace)
  49. type DatabaseInfo struct {
  50. Name string
  51. Tables map[string]*TableInfo
  52. CachedAt time.Time // Timestamp when this database info was cached
  53. }
  54. // TableInfo represents a SQL table (MQ topic) with schema information
  55. // Assumptions:
  56. // 1. All topic messages conform to the same schema within a revision
  57. // 2. Schema evolution maintains backward compatibility
  58. // 3. Primary key is implicitly the message timestamp/offset
  59. type TableInfo struct {
  60. Name string
  61. Namespace string
  62. Schema *schema.Schema
  63. Columns []ColumnInfo
  64. RevisionId uint32
  65. CachedAt time.Time // Timestamp when this table info was cached
  66. }
  67. // ColumnInfo represents a SQL column (MQ schema field)
  68. type ColumnInfo struct {
  69. Name string
  70. Type string // SQL type representation
  71. Nullable bool // Assumption: MQ fields are nullable by default
  72. }
  73. // NewSchemaCatalog creates a new schema catalog
  74. // Uses master address for service discovery of filers and brokers
  75. func NewSchemaCatalog(masterAddress string) *SchemaCatalog {
  76. return &SchemaCatalog{
  77. databases: make(map[string]*DatabaseInfo),
  78. brokerClient: NewBrokerClient(masterAddress),
  79. defaultPartitionCount: 6, // Default partition count, can be made configurable via environment variable
  80. cacheTTL: 5 * time.Minute, // Default cache TTL of 5 minutes, can be made configurable
  81. }
  82. }
  83. // ListDatabases returns all available databases (MQ namespaces)
  84. // Assumption: This would be populated from MQ broker metadata
  85. func (c *SchemaCatalog) ListDatabases() []string {
  86. // Clean up expired cache entries first
  87. c.mu.Lock()
  88. c.cleanExpiredDatabases()
  89. c.mu.Unlock()
  90. c.mu.RLock()
  91. defer c.mu.RUnlock()
  92. // Try to get real namespaces from broker first
  93. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  94. defer cancel()
  95. namespaces, err := c.brokerClient.ListNamespaces(ctx)
  96. if err != nil {
  97. // Silently handle broker connection errors
  98. // Fallback to cached databases if broker unavailable
  99. databases := make([]string, 0, len(c.databases))
  100. for name := range c.databases {
  101. databases = append(databases, name)
  102. }
  103. // Return empty list if no cached data (no more sample data)
  104. return databases
  105. }
  106. return namespaces
  107. }
  108. // ListTables returns all tables in a database (MQ topics in namespace)
  109. func (c *SchemaCatalog) ListTables(database string) ([]string, error) {
  110. // Clean up expired cache entries first
  111. c.mu.Lock()
  112. c.cleanExpiredDatabases()
  113. c.mu.Unlock()
  114. c.mu.RLock()
  115. defer c.mu.RUnlock()
  116. // Try to get real topics from broker first
  117. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  118. defer cancel()
  119. topics, err := c.brokerClient.ListTopics(ctx, database)
  120. if err != nil {
  121. // Fallback to cached data if broker unavailable
  122. db, exists := c.databases[database]
  123. if !exists {
  124. // Return empty list if database not found (no more sample data)
  125. return []string{}, nil
  126. }
  127. tables := make([]string, 0, len(db.Tables))
  128. for name := range db.Tables {
  129. tables = append(tables, name)
  130. }
  131. return tables, nil
  132. }
  133. return topics, nil
  134. }
  135. // GetTableInfo returns detailed schema information for a table
  136. // Assumption: Table exists and schema is accessible
  137. func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) {
  138. // Clean up expired cache entries first
  139. c.mu.Lock()
  140. c.cleanExpiredDatabases()
  141. c.mu.Unlock()
  142. c.mu.RLock()
  143. db, exists := c.databases[database]
  144. if !exists {
  145. c.mu.RUnlock()
  146. return nil, TableNotFoundError{
  147. Database: database,
  148. Table: "",
  149. }
  150. }
  151. tableInfo, exists := db.Tables[table]
  152. if !exists || c.isTableCacheExpired(tableInfo) {
  153. c.mu.RUnlock()
  154. // Try to refresh table info from broker if not found or expired
  155. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  156. defer cancel()
  157. recordType, err := c.brokerClient.GetTopicSchema(ctx, database, table)
  158. if err != nil {
  159. // If broker unavailable and we have expired cached data, return it
  160. if exists {
  161. return tableInfo, nil
  162. }
  163. // Otherwise return not found error
  164. return nil, TableNotFoundError{
  165. Database: database,
  166. Table: table,
  167. }
  168. }
  169. // Convert the broker response to schema and register it
  170. mqSchema := &schema.Schema{
  171. RecordType: recordType,
  172. RevisionId: 1, // Default revision for schema fetched from broker
  173. }
  174. // Register the refreshed schema
  175. err = c.RegisterTopic(database, table, mqSchema)
  176. if err != nil {
  177. // If registration fails but we have cached data, return it
  178. if exists {
  179. return tableInfo, nil
  180. }
  181. return nil, fmt.Errorf("failed to register topic schema: %v", err)
  182. }
  183. // Get the newly registered table info
  184. c.mu.RLock()
  185. defer c.mu.RUnlock()
  186. db, exists := c.databases[database]
  187. if !exists {
  188. return nil, TableNotFoundError{
  189. Database: database,
  190. Table: table,
  191. }
  192. }
  193. tableInfo, exists := db.Tables[table]
  194. if !exists {
  195. return nil, TableNotFoundError{
  196. Database: database,
  197. Table: table,
  198. }
  199. }
  200. return tableInfo, nil
  201. }
  202. c.mu.RUnlock()
  203. return tableInfo, nil
  204. }
  205. // RegisterTopic adds or updates a topic's schema information in the catalog
  206. // Assumption: This is called when topics are created or schemas are modified
  207. func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *schema.Schema) error {
  208. c.mu.Lock()
  209. defer c.mu.Unlock()
  210. now := time.Now()
  211. // Ensure database exists
  212. db, exists := c.databases[namespace]
  213. if !exists {
  214. db = &DatabaseInfo{
  215. Name: namespace,
  216. Tables: make(map[string]*TableInfo),
  217. CachedAt: now,
  218. }
  219. c.databases[namespace] = db
  220. }
  221. // Convert MQ schema to SQL table info
  222. tableInfo, err := c.convertMQSchemaToTableInfo(namespace, topicName, mqSchema)
  223. if err != nil {
  224. return fmt.Errorf("failed to convert MQ schema: %v", err)
  225. }
  226. // Set the cached timestamp for the table
  227. tableInfo.CachedAt = now
  228. db.Tables[topicName] = tableInfo
  229. return nil
  230. }
  231. // convertMQSchemaToTableInfo converts MQ schema to SQL table information
  232. // Assumptions:
  233. // 1. MQ scalar types map directly to SQL types
  234. // 2. Complex types (arrays, maps) are serialized as JSON strings
  235. // 3. All fields are nullable unless specifically marked otherwise
  236. func (c *SchemaCatalog) convertMQSchemaToTableInfo(namespace, topicName string, mqSchema *schema.Schema) (*TableInfo, error) {
  237. columns := make([]ColumnInfo, len(mqSchema.RecordType.Fields))
  238. for i, field := range mqSchema.RecordType.Fields {
  239. sqlType, err := c.convertMQFieldTypeToSQL(field.Type)
  240. if err != nil {
  241. return nil, fmt.Errorf("unsupported field type for '%s': %v", field.Name, err)
  242. }
  243. columns[i] = ColumnInfo{
  244. Name: field.Name,
  245. Type: sqlType,
  246. Nullable: true, // Assumption: MQ fields are nullable by default
  247. }
  248. }
  249. return &TableInfo{
  250. Name: topicName,
  251. Namespace: namespace,
  252. Schema: mqSchema,
  253. Columns: columns,
  254. RevisionId: mqSchema.RevisionId,
  255. }, nil
  256. }
  257. // convertMQFieldTypeToSQL maps MQ field types to SQL types
  258. // Uses standard SQL type mappings with PostgreSQL compatibility
  259. func (c *SchemaCatalog) convertMQFieldTypeToSQL(fieldType *schema_pb.Type) (string, error) {
  260. switch t := fieldType.Kind.(type) {
  261. case *schema_pb.Type_ScalarType:
  262. switch t.ScalarType {
  263. case schema_pb.ScalarType_BOOL:
  264. return "BOOLEAN", nil
  265. case schema_pb.ScalarType_INT32:
  266. return "INT", nil
  267. case schema_pb.ScalarType_INT64:
  268. return "BIGINT", nil
  269. case schema_pb.ScalarType_FLOAT:
  270. return "FLOAT", nil
  271. case schema_pb.ScalarType_DOUBLE:
  272. return "DOUBLE", nil
  273. case schema_pb.ScalarType_BYTES:
  274. return "VARBINARY", nil
  275. case schema_pb.ScalarType_STRING:
  276. return "VARCHAR(255)", nil // Assumption: Default string length
  277. default:
  278. return "", fmt.Errorf("unsupported scalar type: %v", t.ScalarType)
  279. }
  280. case *schema_pb.Type_ListType:
  281. // Assumption: Lists are serialized as JSON strings in SQL
  282. return "TEXT", nil
  283. case *schema_pb.Type_RecordType:
  284. // Assumption: Nested records are serialized as JSON strings
  285. return "TEXT", nil
  286. default:
  287. return "", fmt.Errorf("unsupported field type: %T", t)
  288. }
  289. }
  290. // SetCurrentDatabase sets the active database context
  291. // Assumption: Used for implementing "USE database" functionality
  292. func (c *SchemaCatalog) SetCurrentDatabase(database string) error {
  293. c.mu.Lock()
  294. defer c.mu.Unlock()
  295. // TODO: Validate database exists in MQ broker
  296. c.currentDatabase = database
  297. return nil
  298. }
  299. // GetCurrentDatabase returns the currently active database
  300. func (c *SchemaCatalog) GetCurrentDatabase() string {
  301. c.mu.RLock()
  302. defer c.mu.RUnlock()
  303. return c.currentDatabase
  304. }
  305. // SetDefaultPartitionCount sets the default number of partitions for new topics
  306. func (c *SchemaCatalog) SetDefaultPartitionCount(count int32) {
  307. c.mu.Lock()
  308. defer c.mu.Unlock()
  309. c.defaultPartitionCount = count
  310. }
  311. // GetDefaultPartitionCount returns the default number of partitions for new topics
  312. func (c *SchemaCatalog) GetDefaultPartitionCount() int32 {
  313. c.mu.RLock()
  314. defer c.mu.RUnlock()
  315. return c.defaultPartitionCount
  316. }
  317. // SetCacheTTL sets the time-to-live for cached database and table information
  318. func (c *SchemaCatalog) SetCacheTTL(ttl time.Duration) {
  319. c.mu.Lock()
  320. defer c.mu.Unlock()
  321. c.cacheTTL = ttl
  322. }
  323. // GetCacheTTL returns the current cache TTL setting
  324. func (c *SchemaCatalog) GetCacheTTL() time.Duration {
  325. c.mu.RLock()
  326. defer c.mu.RUnlock()
  327. return c.cacheTTL
  328. }
  329. // isDatabaseCacheExpired checks if a database's cached information has expired
  330. func (c *SchemaCatalog) isDatabaseCacheExpired(db *DatabaseInfo) bool {
  331. return time.Since(db.CachedAt) > c.cacheTTL
  332. }
  333. // isTableCacheExpired checks if a table's cached information has expired
  334. func (c *SchemaCatalog) isTableCacheExpired(table *TableInfo) bool {
  335. return time.Since(table.CachedAt) > c.cacheTTL
  336. }
  337. // cleanExpiredDatabases removes expired database entries from cache
  338. // Note: This method assumes the caller already holds the write lock
  339. func (c *SchemaCatalog) cleanExpiredDatabases() {
  340. for name, db := range c.databases {
  341. if c.isDatabaseCacheExpired(db) {
  342. delete(c.databases, name)
  343. } else {
  344. // Clean expired tables within non-expired databases
  345. for tableName, table := range db.Tables {
  346. if c.isTableCacheExpired(table) {
  347. delete(db.Tables, tableName)
  348. }
  349. }
  350. }
  351. }
  352. }
  353. // CleanExpiredCache removes all expired entries from the cache
  354. // This method can be called externally to perform periodic cache cleanup
  355. func (c *SchemaCatalog) CleanExpiredCache() {
  356. c.mu.Lock()
  357. defer c.mu.Unlock()
  358. c.cleanExpiredDatabases()
  359. }