| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419 |
- package engine
- import (
- "context"
- "fmt"
- "sync"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/mq/schema"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- )
- // BrokerClientInterface defines the interface for broker client operations
- // Both real BrokerClient and MockBrokerClient implement this interface
- type BrokerClientInterface interface {
- ListNamespaces(ctx context.Context) ([]string, error)
- ListTopics(ctx context.Context, namespace string) ([]string, error)
- GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error)
- GetFilerClient() (filer_pb.FilerClient, error)
- ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error
- DeleteTopic(ctx context.Context, namespace, topicName string) error
- // GetUnflushedMessages returns only messages that haven't been flushed to disk yet
- // This prevents double-counting when combining with disk-based data
- GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error)
- }
- // SchemaCatalog manages the mapping between MQ topics and SQL tables
- // Assumptions:
- // 1. Each MQ namespace corresponds to a SQL database
- // 2. Each MQ topic corresponds to a SQL table
- // 3. Topic schemas are cached for performance
- // 4. Schema evolution is tracked via RevisionId
- type SchemaCatalog struct {
- mu sync.RWMutex
- // databases maps namespace names to database metadata
- // Assumption: Namespace names are valid SQL database identifiers
- databases map[string]*DatabaseInfo
- // currentDatabase tracks the active database context (for USE database)
- // Assumption: Single-threaded usage per SQL session
- currentDatabase string
- // brokerClient handles communication with MQ broker
- brokerClient BrokerClientInterface // Use interface for dependency injection
- // defaultPartitionCount is the default number of partitions for new topics
- // Can be overridden in CREATE TABLE statements with PARTITION COUNT option
- defaultPartitionCount int32
- // cacheTTL is the time-to-live for cached database and table information
- // After this duration, cached data is considered stale and will be refreshed
- cacheTTL time.Duration
- }
- // DatabaseInfo represents a SQL database (MQ namespace)
- type DatabaseInfo struct {
- Name string
- Tables map[string]*TableInfo
- CachedAt time.Time // Timestamp when this database info was cached
- }
- // TableInfo represents a SQL table (MQ topic) with schema information
- // Assumptions:
- // 1. All topic messages conform to the same schema within a revision
- // 2. Schema evolution maintains backward compatibility
- // 3. Primary key is implicitly the message timestamp/offset
- type TableInfo struct {
- Name string
- Namespace string
- Schema *schema.Schema
- Columns []ColumnInfo
- RevisionId uint32
- CachedAt time.Time // Timestamp when this table info was cached
- }
- // ColumnInfo represents a SQL column (MQ schema field)
- type ColumnInfo struct {
- Name string
- Type string // SQL type representation
- Nullable bool // Assumption: MQ fields are nullable by default
- }
- // NewSchemaCatalog creates a new schema catalog
- // Uses master address for service discovery of filers and brokers
- func NewSchemaCatalog(masterAddress string) *SchemaCatalog {
- return &SchemaCatalog{
- databases: make(map[string]*DatabaseInfo),
- brokerClient: NewBrokerClient(masterAddress),
- defaultPartitionCount: 6, // Default partition count, can be made configurable via environment variable
- cacheTTL: 5 * time.Minute, // Default cache TTL of 5 minutes, can be made configurable
- }
- }
- // ListDatabases returns all available databases (MQ namespaces)
- // Assumption: This would be populated from MQ broker metadata
- func (c *SchemaCatalog) ListDatabases() []string {
- // Clean up expired cache entries first
- c.mu.Lock()
- c.cleanExpiredDatabases()
- c.mu.Unlock()
- c.mu.RLock()
- defer c.mu.RUnlock()
- // Try to get real namespaces from broker first
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- namespaces, err := c.brokerClient.ListNamespaces(ctx)
- if err != nil {
- // Silently handle broker connection errors
- // Fallback to cached databases if broker unavailable
- databases := make([]string, 0, len(c.databases))
- for name := range c.databases {
- databases = append(databases, name)
- }
- // Return empty list if no cached data (no more sample data)
- return databases
- }
- return namespaces
- }
- // ListTables returns all tables in a database (MQ topics in namespace)
- func (c *SchemaCatalog) ListTables(database string) ([]string, error) {
- // Clean up expired cache entries first
- c.mu.Lock()
- c.cleanExpiredDatabases()
- c.mu.Unlock()
- c.mu.RLock()
- defer c.mu.RUnlock()
- // Try to get real topics from broker first
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- topics, err := c.brokerClient.ListTopics(ctx, database)
- if err != nil {
- // Fallback to cached data if broker unavailable
- db, exists := c.databases[database]
- if !exists {
- // Return empty list if database not found (no more sample data)
- return []string{}, nil
- }
- tables := make([]string, 0, len(db.Tables))
- for name := range db.Tables {
- tables = append(tables, name)
- }
- return tables, nil
- }
- return topics, nil
- }
- // GetTableInfo returns detailed schema information for a table
- // Assumption: Table exists and schema is accessible
- func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) {
- // Clean up expired cache entries first
- c.mu.Lock()
- c.cleanExpiredDatabases()
- c.mu.Unlock()
- c.mu.RLock()
- db, exists := c.databases[database]
- if !exists {
- c.mu.RUnlock()
- return nil, TableNotFoundError{
- Database: database,
- Table: "",
- }
- }
- tableInfo, exists := db.Tables[table]
- if !exists || c.isTableCacheExpired(tableInfo) {
- c.mu.RUnlock()
- // Try to refresh table info from broker if not found or expired
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- recordType, err := c.brokerClient.GetTopicSchema(ctx, database, table)
- if err != nil {
- // If broker unavailable and we have expired cached data, return it
- if exists {
- return tableInfo, nil
- }
- // Otherwise return not found error
- return nil, TableNotFoundError{
- Database: database,
- Table: table,
- }
- }
- // Convert the broker response to schema and register it
- mqSchema := &schema.Schema{
- RecordType: recordType,
- RevisionId: 1, // Default revision for schema fetched from broker
- }
- // Register the refreshed schema
- err = c.RegisterTopic(database, table, mqSchema)
- if err != nil {
- // If registration fails but we have cached data, return it
- if exists {
- return tableInfo, nil
- }
- return nil, fmt.Errorf("failed to register topic schema: %v", err)
- }
- // Get the newly registered table info
- c.mu.RLock()
- defer c.mu.RUnlock()
- db, exists := c.databases[database]
- if !exists {
- return nil, TableNotFoundError{
- Database: database,
- Table: table,
- }
- }
- tableInfo, exists := db.Tables[table]
- if !exists {
- return nil, TableNotFoundError{
- Database: database,
- Table: table,
- }
- }
- return tableInfo, nil
- }
- c.mu.RUnlock()
- return tableInfo, nil
- }
- // RegisterTopic adds or updates a topic's schema information in the catalog
- // Assumption: This is called when topics are created or schemas are modified
- func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *schema.Schema) error {
- c.mu.Lock()
- defer c.mu.Unlock()
- now := time.Now()
- // Ensure database exists
- db, exists := c.databases[namespace]
- if !exists {
- db = &DatabaseInfo{
- Name: namespace,
- Tables: make(map[string]*TableInfo),
- CachedAt: now,
- }
- c.databases[namespace] = db
- }
- // Convert MQ schema to SQL table info
- tableInfo, err := c.convertMQSchemaToTableInfo(namespace, topicName, mqSchema)
- if err != nil {
- return fmt.Errorf("failed to convert MQ schema: %v", err)
- }
- // Set the cached timestamp for the table
- tableInfo.CachedAt = now
- db.Tables[topicName] = tableInfo
- return nil
- }
- // convertMQSchemaToTableInfo converts MQ schema to SQL table information
- // Assumptions:
- // 1. MQ scalar types map directly to SQL types
- // 2. Complex types (arrays, maps) are serialized as JSON strings
- // 3. All fields are nullable unless specifically marked otherwise
- func (c *SchemaCatalog) convertMQSchemaToTableInfo(namespace, topicName string, mqSchema *schema.Schema) (*TableInfo, error) {
- columns := make([]ColumnInfo, len(mqSchema.RecordType.Fields))
- for i, field := range mqSchema.RecordType.Fields {
- sqlType, err := c.convertMQFieldTypeToSQL(field.Type)
- if err != nil {
- return nil, fmt.Errorf("unsupported field type for '%s': %v", field.Name, err)
- }
- columns[i] = ColumnInfo{
- Name: field.Name,
- Type: sqlType,
- Nullable: true, // Assumption: MQ fields are nullable by default
- }
- }
- return &TableInfo{
- Name: topicName,
- Namespace: namespace,
- Schema: mqSchema,
- Columns: columns,
- RevisionId: mqSchema.RevisionId,
- }, nil
- }
- // convertMQFieldTypeToSQL maps MQ field types to SQL types
- // Uses standard SQL type mappings with PostgreSQL compatibility
- func (c *SchemaCatalog) convertMQFieldTypeToSQL(fieldType *schema_pb.Type) (string, error) {
- switch t := fieldType.Kind.(type) {
- case *schema_pb.Type_ScalarType:
- switch t.ScalarType {
- case schema_pb.ScalarType_BOOL:
- return "BOOLEAN", nil
- case schema_pb.ScalarType_INT32:
- return "INT", nil
- case schema_pb.ScalarType_INT64:
- return "BIGINT", nil
- case schema_pb.ScalarType_FLOAT:
- return "FLOAT", nil
- case schema_pb.ScalarType_DOUBLE:
- return "DOUBLE", nil
- case schema_pb.ScalarType_BYTES:
- return "VARBINARY", nil
- case schema_pb.ScalarType_STRING:
- return "VARCHAR(255)", nil // Assumption: Default string length
- default:
- return "", fmt.Errorf("unsupported scalar type: %v", t.ScalarType)
- }
- case *schema_pb.Type_ListType:
- // Assumption: Lists are serialized as JSON strings in SQL
- return "TEXT", nil
- case *schema_pb.Type_RecordType:
- // Assumption: Nested records are serialized as JSON strings
- return "TEXT", nil
- default:
- return "", fmt.Errorf("unsupported field type: %T", t)
- }
- }
- // SetCurrentDatabase sets the active database context
- // Assumption: Used for implementing "USE database" functionality
- func (c *SchemaCatalog) SetCurrentDatabase(database string) error {
- c.mu.Lock()
- defer c.mu.Unlock()
- // TODO: Validate database exists in MQ broker
- c.currentDatabase = database
- return nil
- }
- // GetCurrentDatabase returns the currently active database
- func (c *SchemaCatalog) GetCurrentDatabase() string {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return c.currentDatabase
- }
- // SetDefaultPartitionCount sets the default number of partitions for new topics
- func (c *SchemaCatalog) SetDefaultPartitionCount(count int32) {
- c.mu.Lock()
- defer c.mu.Unlock()
- c.defaultPartitionCount = count
- }
- // GetDefaultPartitionCount returns the default number of partitions for new topics
- func (c *SchemaCatalog) GetDefaultPartitionCount() int32 {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return c.defaultPartitionCount
- }
- // SetCacheTTL sets the time-to-live for cached database and table information
- func (c *SchemaCatalog) SetCacheTTL(ttl time.Duration) {
- c.mu.Lock()
- defer c.mu.Unlock()
- c.cacheTTL = ttl
- }
- // GetCacheTTL returns the current cache TTL setting
- func (c *SchemaCatalog) GetCacheTTL() time.Duration {
- c.mu.RLock()
- defer c.mu.RUnlock()
- return c.cacheTTL
- }
- // isDatabaseCacheExpired checks if a database's cached information has expired
- func (c *SchemaCatalog) isDatabaseCacheExpired(db *DatabaseInfo) bool {
- return time.Since(db.CachedAt) > c.cacheTTL
- }
- // isTableCacheExpired checks if a table's cached information has expired
- func (c *SchemaCatalog) isTableCacheExpired(table *TableInfo) bool {
- return time.Since(table.CachedAt) > c.cacheTTL
- }
- // cleanExpiredDatabases removes expired database entries from cache
- // Note: This method assumes the caller already holds the write lock
- func (c *SchemaCatalog) cleanExpiredDatabases() {
- for name, db := range c.databases {
- if c.isDatabaseCacheExpired(db) {
- delete(c.databases, name)
- } else {
- // Clean expired tables within non-expired databases
- for tableName, table := range db.Tables {
- if c.isTableCacheExpired(table) {
- delete(db.Tables, tableName)
- }
- }
- }
- }
- }
- // CleanExpiredCache removes all expired entries from the cache
- // This method can be called externally to perform periodic cache cleanup
- func (c *SchemaCatalog) CleanExpiredCache() {
- c.mu.Lock()
- defer c.mu.Unlock()
- c.cleanExpiredDatabases()
- }
|