| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720472147224723472447254726472747284729473047314732473347344735473647374738473947404741474247434744474547464747474847494750475147524753475447554756475747584759476047614762476347644765476647674768476947704771477247734774477547764777477847794780478147824783478447854786478747884789479047914792479347944795479647974798479948004801480248034804480548064807480848094810481148124813481448154816481748184819482048214822482348244825482648274828482948304831483248334834483548364837483848394840484148424843484448454846484748484849485048514852485348544855485648574858485948604861486248634864486548664867486848694870487148724873487448754876487748784879488048814882488348844885488648874888488948904891489248934894489548964897489848994900490149024903490449054906490749084909491049114912491349144915491649174918491949204921492249234924492549264927492849294930493149324933493449354936493749384939494049414942494349444945494649474948494949504951495249534954495549564957495849594960496149624963496449654966496749684969497049714972497349744975497649774978497949804981498249834984498549864987498849894990499149924993499449954996499749984999500050015002500350045005500650075008500950105011501250135014501550165017501850195020502150225023502450255026502750285029503050315032503350345035503650375038503950405041504250435044504550465047504850495050505150525053505450555056505750585059506050615062506350645065506650675068506950705071507250735074507550765077507850795080508150825083508450855086508750885089509050915092509350945095509650975098509951005101510251035104510551065107510851095110511151125113511451155116511751185119512051215122512351245125512651275128512951305131513251335134513551365137513851395140514151425143514451455146514751485149515051515152515351545155515651575158515951605161516251635164516551665167516851695170517151725173517451755176517751785179518051815182518351845185518651875188518951905191519251935194519551965197519851995200520152025203520452055206520752085209521052115212521352145215521652175218521952205221522252235224522552265227522852295230523152325233523452355236523752385239524052415242524352445245524652475248524952505251525252535254525552565257525852595260526152625263526452655266526752685269527052715272527352745275527652775278527952805281528252835284528552865287528852895290529152925293529452955296529752985299530053015302530353045305530653075308530953105311531253135314531553165317531853195320532153225323532453255326532753285329533053315332533353345335533653375338533953405341534253435344534553465347534853495350535153525353535453555356535753585359536053615362536353645365536653675368536953705371537253735374537553765377537853795380538153825383538453855386538753885389539053915392539353945395539653975398539954005401540254035404540554065407540854095410541154125413541454155416541754185419542054215422542354245425542654275428542954305431543254335434543554365437543854395440544154425443544454455446544754485449545054515452545354545455545654575458545954605461546254635464546554665467546854695470547154725473547454755476547754785479548054815482548354845485548654875488548954905491549254935494549554965497549854995500550155025503550455055506550755085509551055115512551355145515551655175518551955205521552255235524552555265527552855295530553155325533553455355536553755385539554055415542554355445545554655475548554955505551555255535554555555565557555855595560556155625563556455655566556755685569557055715572557355745575557655775578557955805581558255835584558555865587558855895590559155925593559455955596559755985599560056015602560356045605560656075608560956105611561256135614561556165617561856195620562156225623562456255626562756285629563056315632563356345635563656375638563956405641564256435644564556465647564856495650565156525653565456555656565756585659566056615662566356645665566656675668566956705671567256735674567556765677567856795680568156825683568456855686568756885689569056915692569356945695569656975698569957005701570257035704570557065707570857095710571157125713571457155716571757185719572057215722572357245725572657275728572957305731573257335734573557365737573857395740574157425743574457455746574757485749575057515752575357545755575657575758575957605761576257635764576557665767576857695770577157725773577457755776577757785779578057815782578357845785578657875788578957905791579257935794579557965797579857995800580158025803580458055806580758085809581058115812581358145815581658175818 |
- package engine
- import (
- "context"
- "encoding/binary"
- "encoding/json"
- "fmt"
- "io"
- "math"
- "math/big"
- "regexp"
- "strconv"
- "strings"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/mq/schema"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- "github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
- "github.com/seaweedfs/seaweedfs/weed/util"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
- "google.golang.org/protobuf/proto"
- )
- // SQL Function Name Constants
- const (
- // Aggregation Functions
- FuncCOUNT = "COUNT"
- FuncSUM = "SUM"
- FuncAVG = "AVG"
- FuncMIN = "MIN"
- FuncMAX = "MAX"
- // String Functions
- FuncUPPER = "UPPER"
- FuncLOWER = "LOWER"
- FuncLENGTH = "LENGTH"
- FuncTRIM = "TRIM"
- FuncBTRIM = "BTRIM" // CockroachDB's internal name for TRIM
- FuncLTRIM = "LTRIM"
- FuncRTRIM = "RTRIM"
- FuncSUBSTRING = "SUBSTRING"
- FuncLEFT = "LEFT"
- FuncRIGHT = "RIGHT"
- FuncCONCAT = "CONCAT"
- // DateTime Functions
- FuncCURRENT_DATE = "CURRENT_DATE"
- FuncCURRENT_TIME = "CURRENT_TIME"
- FuncCURRENT_TIMESTAMP = "CURRENT_TIMESTAMP"
- FuncNOW = "NOW"
- FuncEXTRACT = "EXTRACT"
- FuncDATE_TRUNC = "DATE_TRUNC"
- // PostgreSQL uses EXTRACT(part FROM date) instead of convenience functions like YEAR(), MONTH(), etc.
- )
- // PostgreSQL-compatible SQL AST types
- type Statement interface {
- isStatement()
- }
- type ShowStatement struct {
- Type string // "databases", "tables", "columns"
- Table string // for SHOW COLUMNS FROM table
- Schema string // for database context
- OnTable NameRef // for compatibility with existing code that checks OnTable
- }
- func (s *ShowStatement) isStatement() {}
- type UseStatement struct {
- Database string // database name to switch to
- }
- func (u *UseStatement) isStatement() {}
- type DDLStatement struct {
- Action string // "create", "alter", "drop"
- NewName NameRef
- TableSpec *TableSpec
- }
- type NameRef struct {
- Name StringGetter
- Qualifier StringGetter
- }
- type StringGetter interface {
- String() string
- }
- type stringValue string
- func (s stringValue) String() string { return string(s) }
- type TableSpec struct {
- Columns []ColumnDef
- }
- type ColumnDef struct {
- Name StringGetter
- Type TypeRef
- }
- type TypeRef struct {
- Type string
- }
- func (d *DDLStatement) isStatement() {}
- type SelectStatement struct {
- SelectExprs []SelectExpr
- From []TableExpr
- Where *WhereClause
- Limit *LimitClause
- WindowFunctions []*WindowFunction
- }
- type WhereClause struct {
- Expr ExprNode
- }
- type LimitClause struct {
- Rowcount ExprNode
- Offset ExprNode
- }
- func (s *SelectStatement) isStatement() {}
- // Window function types for time-series analytics
- type WindowSpec struct {
- PartitionBy []ExprNode
- OrderBy []*OrderByClause
- }
- type WindowFunction struct {
- Function string // ROW_NUMBER, RANK, LAG, LEAD
- Args []ExprNode // Function arguments
- Over *WindowSpec
- Alias string // Column alias for the result
- }
- type OrderByClause struct {
- Column string
- Order string // ASC or DESC
- }
- type SelectExpr interface {
- isSelectExpr()
- }
- type StarExpr struct{}
- func (s *StarExpr) isSelectExpr() {}
- type AliasedExpr struct {
- Expr ExprNode
- As AliasRef
- }
- type AliasRef interface {
- IsEmpty() bool
- String() string
- }
- type aliasValue string
- func (a aliasValue) IsEmpty() bool { return string(a) == "" }
- func (a aliasValue) String() string { return string(a) }
- func (a *AliasedExpr) isSelectExpr() {}
- type TableExpr interface {
- isTableExpr()
- }
- type AliasedTableExpr struct {
- Expr interface{}
- }
- func (a *AliasedTableExpr) isTableExpr() {}
- type TableName struct {
- Name StringGetter
- Qualifier StringGetter
- }
- type ExprNode interface {
- isExprNode()
- }
- type FuncExpr struct {
- Name StringGetter
- Exprs []SelectExpr
- }
- func (f *FuncExpr) isExprNode() {}
- type ColName struct {
- Name StringGetter
- }
- func (c *ColName) isExprNode() {}
- // ArithmeticExpr represents arithmetic operations like id+user_id and string concatenation like name||suffix
- type ArithmeticExpr struct {
- Left ExprNode
- Right ExprNode
- Operator string // +, -, *, /, %, ||
- }
- func (a *ArithmeticExpr) isExprNode() {}
- type ComparisonExpr struct {
- Left ExprNode
- Right ExprNode
- Operator string
- }
- func (c *ComparisonExpr) isExprNode() {}
- type AndExpr struct {
- Left ExprNode
- Right ExprNode
- }
- func (a *AndExpr) isExprNode() {}
- type OrExpr struct {
- Left ExprNode
- Right ExprNode
- }
- func (o *OrExpr) isExprNode() {}
- type ParenExpr struct {
- Expr ExprNode
- }
- func (p *ParenExpr) isExprNode() {}
- type SQLVal struct {
- Type int
- Val []byte
- }
- func (s *SQLVal) isExprNode() {}
- type ValTuple []ExprNode
- func (v ValTuple) isExprNode() {}
- type IntervalExpr struct {
- Value string // The interval value (e.g., "1 hour", "30 minutes")
- Unit string // The unit (parsed from value)
- }
- func (i *IntervalExpr) isExprNode() {}
- type BetweenExpr struct {
- Left ExprNode // The expression to test
- From ExprNode // Lower bound (inclusive)
- To ExprNode // Upper bound (inclusive)
- Not bool // true for NOT BETWEEN
- }
- func (b *BetweenExpr) isExprNode() {}
- type IsNullExpr struct {
- Expr ExprNode // The expression to test for null
- }
- func (i *IsNullExpr) isExprNode() {}
- type IsNotNullExpr struct {
- Expr ExprNode // The expression to test for not null
- }
- func (i *IsNotNullExpr) isExprNode() {}
- // SQLVal types
- const (
- IntVal = iota
- StrVal
- FloatVal
- )
- // Operator constants
- const (
- CreateStr = "create"
- AlterStr = "alter"
- DropStr = "drop"
- EqualStr = "="
- LessThanStr = "<"
- GreaterThanStr = ">"
- LessEqualStr = "<="
- GreaterEqualStr = ">="
- NotEqualStr = "!="
- )
- // parseIdentifier properly parses a potentially quoted identifier (database/table name)
- func parseIdentifier(identifier string) string {
- identifier = strings.TrimSpace(identifier)
- identifier = strings.TrimSuffix(identifier, ";") // Remove trailing semicolon
- // Handle double quotes (PostgreSQL standard)
- if len(identifier) >= 2 && identifier[0] == '"' && identifier[len(identifier)-1] == '"' {
- return identifier[1 : len(identifier)-1]
- }
- // Handle backticks (MySQL compatibility)
- if len(identifier) >= 2 && identifier[0] == '`' && identifier[len(identifier)-1] == '`' {
- return identifier[1 : len(identifier)-1]
- }
- return identifier
- }
- // ParseSQL parses PostgreSQL-compatible SQL statements using CockroachDB parser for SELECT queries
- func ParseSQL(sql string) (Statement, error) {
- sql = strings.TrimSpace(sql)
- sqlUpper := strings.ToUpper(sql)
- // Handle USE statement
- if strings.HasPrefix(sqlUpper, "USE ") {
- parts := strings.Fields(sql)
- if len(parts) < 2 {
- return nil, fmt.Errorf("USE statement requires a database name")
- }
- // Parse the database name properly, handling quoted identifiers
- dbName := parseIdentifier(strings.Join(parts[1:], " "))
- return &UseStatement{Database: dbName}, nil
- }
- // Handle DESCRIBE/DESC statements as aliases for SHOW COLUMNS FROM
- if strings.HasPrefix(sqlUpper, "DESCRIBE ") || strings.HasPrefix(sqlUpper, "DESC ") {
- parts := strings.Fields(sql)
- if len(parts) < 2 {
- return nil, fmt.Errorf("DESCRIBE/DESC statement requires a table name")
- }
- var tableName string
- var database string
- // Get the raw table name (before parsing identifiers)
- var rawTableName string
- if len(parts) >= 3 && strings.ToUpper(parts[1]) == "TABLE" {
- rawTableName = parts[2]
- } else {
- rawTableName = parts[1]
- }
- // Parse database.table format first, then apply parseIdentifier to each part
- if strings.Contains(rawTableName, ".") {
- // Handle quoted database.table like "db"."table"
- if strings.HasPrefix(rawTableName, "\"") || strings.HasPrefix(rawTableName, "`") {
- // Find the closing quote and the dot
- var quoteChar byte = '"'
- if rawTableName[0] == '`' {
- quoteChar = '`'
- }
- // Find the matching closing quote
- closingIndex := -1
- for i := 1; i < len(rawTableName); i++ {
- if rawTableName[i] == quoteChar {
- closingIndex = i
- break
- }
- }
- if closingIndex != -1 && closingIndex+1 < len(rawTableName) && rawTableName[closingIndex+1] == '.' {
- // Valid quoted database name
- database = parseIdentifier(rawTableName[:closingIndex+1])
- tableName = parseIdentifier(rawTableName[closingIndex+2:])
- } else {
- // Fall back to simple split then parse
- dbTableParts := strings.SplitN(rawTableName, ".", 2)
- database = parseIdentifier(dbTableParts[0])
- tableName = parseIdentifier(dbTableParts[1])
- }
- } else {
- // Simple case: no quotes, just split then parse
- dbTableParts := strings.SplitN(rawTableName, ".", 2)
- database = parseIdentifier(dbTableParts[0])
- tableName = parseIdentifier(dbTableParts[1])
- }
- } else {
- // No database.table format, just parse the table name
- tableName = parseIdentifier(rawTableName)
- }
- stmt := &ShowStatement{Type: "columns"}
- stmt.OnTable.Name = stringValue(tableName)
- if database != "" {
- stmt.OnTable.Qualifier = stringValue(database)
- }
- return stmt, nil
- }
- // Handle SHOW statements (keep custom parsing for these simple cases)
- if strings.HasPrefix(sqlUpper, "SHOW DATABASES") || strings.HasPrefix(sqlUpper, "SHOW SCHEMAS") {
- return &ShowStatement{Type: "databases"}, nil
- }
- if strings.HasPrefix(sqlUpper, "SHOW TABLES") {
- stmt := &ShowStatement{Type: "tables"}
- // Handle "SHOW TABLES FROM database" syntax
- if strings.Contains(sqlUpper, "FROM") {
- partsUpper := strings.Fields(sqlUpper)
- partsOriginal := strings.Fields(sql) // Use original casing
- for i, part := range partsUpper {
- if part == "FROM" && i+1 < len(partsOriginal) {
- // Parse the database name properly
- dbName := parseIdentifier(partsOriginal[i+1])
- stmt.Schema = dbName // Set the Schema field for the test
- stmt.OnTable.Name = stringValue(dbName) // Keep for compatibility
- break
- }
- }
- }
- return stmt, nil
- }
- if strings.HasPrefix(sqlUpper, "SHOW COLUMNS FROM") {
- // Parse "SHOW COLUMNS FROM table" or "SHOW COLUMNS FROM database.table"
- parts := strings.Fields(sql)
- if len(parts) < 4 {
- return nil, fmt.Errorf("SHOW COLUMNS FROM statement requires a table name")
- }
- // Get the raw table name (before parsing identifiers)
- rawTableName := parts[3]
- var tableName string
- var database string
- // Parse database.table format first, then apply parseIdentifier to each part
- if strings.Contains(rawTableName, ".") {
- // Handle quoted database.table like "db"."table"
- if strings.HasPrefix(rawTableName, "\"") || strings.HasPrefix(rawTableName, "`") {
- // Find the closing quote and the dot
- var quoteChar byte = '"'
- if rawTableName[0] == '`' {
- quoteChar = '`'
- }
- // Find the matching closing quote
- closingIndex := -1
- for i := 1; i < len(rawTableName); i++ {
- if rawTableName[i] == quoteChar {
- closingIndex = i
- break
- }
- }
- if closingIndex != -1 && closingIndex+1 < len(rawTableName) && rawTableName[closingIndex+1] == '.' {
- // Valid quoted database name
- database = parseIdentifier(rawTableName[:closingIndex+1])
- tableName = parseIdentifier(rawTableName[closingIndex+2:])
- } else {
- // Fall back to simple split then parse
- dbTableParts := strings.SplitN(rawTableName, ".", 2)
- database = parseIdentifier(dbTableParts[0])
- tableName = parseIdentifier(dbTableParts[1])
- }
- } else {
- // Simple case: no quotes, just split then parse
- dbTableParts := strings.SplitN(rawTableName, ".", 2)
- database = parseIdentifier(dbTableParts[0])
- tableName = parseIdentifier(dbTableParts[1])
- }
- } else {
- // No database.table format, just parse the table name
- tableName = parseIdentifier(rawTableName)
- }
- stmt := &ShowStatement{Type: "columns"}
- stmt.OnTable.Name = stringValue(tableName)
- if database != "" {
- stmt.OnTable.Qualifier = stringValue(database)
- }
- return stmt, nil
- }
- // Use CockroachDB parser for SELECT statements
- if strings.HasPrefix(sqlUpper, "SELECT") {
- parser := NewCockroachSQLParser()
- return parser.ParseSQL(sql)
- }
- return nil, UnsupportedFeatureError{
- Feature: fmt.Sprintf("statement type: %s", strings.Fields(sqlUpper)[0]),
- Reason: "statement parsing not implemented",
- }
- }
- // debugModeKey is used to store debug mode flag in context
- type debugModeKey struct{}
- // isDebugMode checks if we're in debug/explain mode
- func isDebugMode(ctx context.Context) bool {
- debug, ok := ctx.Value(debugModeKey{}).(bool)
- return ok && debug
- }
- // withDebugMode returns a context with debug mode enabled
- func withDebugMode(ctx context.Context) context.Context {
- return context.WithValue(ctx, debugModeKey{}, true)
- }
- // LogBufferStart tracks the starting buffer index for a file
- // Buffer indexes are monotonically increasing, count = len(chunks)
- type LogBufferStart struct {
- StartIndex int64 `json:"start_index"` // Starting buffer index (count = len(chunks))
- }
- // SQLEngine provides SQL query execution capabilities for SeaweedFS
- // Assumptions:
- // 1. MQ namespaces map directly to SQL databases
- // 2. MQ topics map directly to SQL tables
- // 3. Schema evolution is handled transparently with backward compatibility
- // 4. Queries run against Parquet-stored MQ messages
- type SQLEngine struct {
- catalog *SchemaCatalog
- }
- // NewSQLEngine creates a new SQL execution engine
- // Uses master address for service discovery and initialization
- func NewSQLEngine(masterAddress string) *SQLEngine {
- // Initialize global HTTP client if not already done
- // This is needed for reading partition data from the filer
- if util_http.GetGlobalHttpClient() == nil {
- util_http.InitGlobalHttpClient()
- }
- return &SQLEngine{
- catalog: NewSchemaCatalog(masterAddress),
- }
- }
- // NewSQLEngineWithCatalog creates a new SQL execution engine with a custom catalog
- // Used for testing or when you want to provide a pre-configured catalog
- func NewSQLEngineWithCatalog(catalog *SchemaCatalog) *SQLEngine {
- // Initialize global HTTP client if not already done
- // This is needed for reading partition data from the filer
- if util_http.GetGlobalHttpClient() == nil {
- util_http.InitGlobalHttpClient()
- }
- return &SQLEngine{
- catalog: catalog,
- }
- }
- // GetCatalog returns the schema catalog for external access
- func (e *SQLEngine) GetCatalog() *SchemaCatalog {
- return e.catalog
- }
- // ExecuteSQL parses and executes a SQL statement
- // Assumptions:
- // 1. All SQL statements are PostgreSQL-compatible via pg_query_go
- // 2. DDL operations (CREATE/ALTER/DROP) modify underlying MQ topics
- // 3. DML operations (SELECT) query Parquet files directly
- // 4. Error handling follows PostgreSQL conventions
- func (e *SQLEngine) ExecuteSQL(ctx context.Context, sql string) (*QueryResult, error) {
- startTime := time.Now()
- // Handle EXPLAIN as a special case
- sqlTrimmed := strings.TrimSpace(sql)
- sqlUpper := strings.ToUpper(sqlTrimmed)
- if strings.HasPrefix(sqlUpper, "EXPLAIN") {
- // Extract the actual query after EXPLAIN
- actualSQL := strings.TrimSpace(sqlTrimmed[7:]) // Remove "EXPLAIN"
- return e.executeExplain(ctx, actualSQL, startTime)
- }
- // Parse the SQL statement using PostgreSQL parser
- stmt, err := ParseSQL(sql)
- if err != nil {
- return &QueryResult{
- Error: fmt.Errorf("SQL parse error: %v", err),
- }, err
- }
- // Route to appropriate handler based on statement type
- switch stmt := stmt.(type) {
- case *ShowStatement:
- return e.executeShowStatementWithDescribe(ctx, stmt)
- case *UseStatement:
- return e.executeUseStatement(ctx, stmt)
- case *DDLStatement:
- return e.executeDDLStatement(ctx, stmt)
- case *SelectStatement:
- return e.executeSelectStatement(ctx, stmt)
- default:
- err := fmt.Errorf("unsupported SQL statement type: %T", stmt)
- return &QueryResult{Error: err}, err
- }
- }
- // executeExplain handles EXPLAIN statements by executing the query with plan tracking
- func (e *SQLEngine) executeExplain(ctx context.Context, actualSQL string, startTime time.Time) (*QueryResult, error) {
- // Enable debug mode for EXPLAIN queries
- ctx = withDebugMode(ctx)
- // Parse the actual SQL statement using PostgreSQL parser
- stmt, err := ParseSQL(actualSQL)
- if err != nil {
- return &QueryResult{
- Error: fmt.Errorf("SQL parse error in EXPLAIN query: %v", err),
- }, err
- }
- // Create execution plan
- plan := &QueryExecutionPlan{
- QueryType: strings.ToUpper(strings.Fields(actualSQL)[0]),
- DataSources: []string{},
- OptimizationsUsed: []string{},
- Details: make(map[string]interface{}),
- }
- var result *QueryResult
- // Route to appropriate handler based on statement type (with plan tracking)
- switch stmt := stmt.(type) {
- case *SelectStatement:
- result, err = e.executeSelectStatementWithPlan(ctx, stmt, plan)
- if err != nil {
- plan.Details["error"] = err.Error()
- }
- case *ShowStatement:
- plan.QueryType = "SHOW"
- plan.ExecutionStrategy = "metadata_only"
- result, err = e.executeShowStatementWithDescribe(ctx, stmt)
- default:
- err := fmt.Errorf("EXPLAIN not supported for statement type: %T", stmt)
- return &QueryResult{Error: err}, err
- }
- // Calculate execution time
- plan.ExecutionTimeMs = float64(time.Since(startTime).Nanoseconds()) / 1e6
- // Format execution plan as result
- return e.formatExecutionPlan(plan, result, err)
- }
- // formatExecutionPlan converts execution plan to a hierarchical tree format for display
- func (e *SQLEngine) formatExecutionPlan(plan *QueryExecutionPlan, originalResult *QueryResult, originalErr error) (*QueryResult, error) {
- columns := []string{"Query Execution Plan"}
- rows := [][]sqltypes.Value{}
- var planLines []string
- // Use new tree structure if available, otherwise fallback to legacy format
- if plan.RootNode != nil {
- planLines = e.buildTreePlan(plan, originalErr)
- } else {
- // Build legacy hierarchical plan display
- planLines = e.buildHierarchicalPlan(plan, originalErr)
- }
- for _, line := range planLines {
- rows = append(rows, []sqltypes.Value{
- sqltypes.NewVarChar(line),
- })
- }
- if originalErr != nil {
- return &QueryResult{
- Columns: columns,
- Rows: rows,
- ExecutionPlan: plan,
- Error: originalErr,
- }, originalErr
- }
- return &QueryResult{
- Columns: columns,
- Rows: rows,
- ExecutionPlan: plan,
- }, nil
- }
- // buildTreePlan creates the new tree-based execution plan display
- func (e *SQLEngine) buildTreePlan(plan *QueryExecutionPlan, err error) []string {
- var lines []string
- // Root header
- lines = append(lines, fmt.Sprintf("%s Query (%s)", plan.QueryType, plan.ExecutionStrategy))
- // Build the execution tree
- if plan.RootNode != nil {
- // Root execution node is always the last (and only) child of SELECT Query
- treeLines := e.formatExecutionNode(plan.RootNode, "└── ", " ", true)
- lines = append(lines, treeLines...)
- }
- // Add error information if present
- if err != nil {
- lines = append(lines, "")
- lines = append(lines, fmt.Sprintf("Error: %v", err))
- }
- return lines
- }
- // formatExecutionNode recursively formats execution tree nodes
- func (e *SQLEngine) formatExecutionNode(node ExecutionNode, prefix, childPrefix string, isRoot bool) []string {
- var lines []string
- description := node.GetDescription()
- // Format the current node
- if isRoot {
- lines = append(lines, fmt.Sprintf("%s%s", prefix, description))
- } else {
- lines = append(lines, fmt.Sprintf("%s%s", prefix, description))
- }
- // Add node-specific details
- switch n := node.(type) {
- case *FileSourceNode:
- lines = e.formatFileSourceDetails(lines, n, childPrefix, isRoot)
- case *ScanOperationNode:
- lines = e.formatScanOperationDetails(lines, n, childPrefix, isRoot)
- case *MergeOperationNode:
- lines = e.formatMergeOperationDetails(lines, n, childPrefix, isRoot)
- }
- // Format children
- children := node.GetChildren()
- if len(children) > 0 {
- for i, child := range children {
- isLastChild := i == len(children)-1
- var nextPrefix, nextChildPrefix string
- if isLastChild {
- nextPrefix = childPrefix + "└── "
- nextChildPrefix = childPrefix + " "
- } else {
- nextPrefix = childPrefix + "├── "
- nextChildPrefix = childPrefix + "│ "
- }
- childLines := e.formatExecutionNode(child, nextPrefix, nextChildPrefix, false)
- lines = append(lines, childLines...)
- }
- }
- return lines
- }
- // formatFileSourceDetails adds details for file source nodes
- func (e *SQLEngine) formatFileSourceDetails(lines []string, node *FileSourceNode, childPrefix string, isRoot bool) []string {
- prefix := childPrefix
- if isRoot {
- prefix = "│ "
- }
- // Add predicates
- if len(node.Predicates) > 0 {
- lines = append(lines, fmt.Sprintf("%s├── Predicates: %s", prefix, strings.Join(node.Predicates, " AND ")))
- }
- // Add operations
- if len(node.Operations) > 0 {
- lines = append(lines, fmt.Sprintf("%s└── Operations: %s", prefix, strings.Join(node.Operations, " + ")))
- } else if len(node.Predicates) == 0 {
- lines = append(lines, fmt.Sprintf("%s└── Operation: full_scan", prefix))
- }
- return lines
- }
- // formatScanOperationDetails adds details for scan operation nodes
- func (e *SQLEngine) formatScanOperationDetails(lines []string, node *ScanOperationNode, childPrefix string, isRoot bool) []string {
- prefix := childPrefix
- if isRoot {
- prefix = "│ "
- }
- hasChildren := len(node.Children) > 0
- // Add predicates if present
- if len(node.Predicates) > 0 {
- if hasChildren {
- lines = append(lines, fmt.Sprintf("%s├── Predicates: %s", prefix, strings.Join(node.Predicates, " AND ")))
- } else {
- lines = append(lines, fmt.Sprintf("%s└── Predicates: %s", prefix, strings.Join(node.Predicates, " AND ")))
- }
- }
- return lines
- }
- // formatMergeOperationDetails adds details for merge operation nodes
- func (e *SQLEngine) formatMergeOperationDetails(lines []string, node *MergeOperationNode, childPrefix string, isRoot bool) []string {
- hasChildren := len(node.Children) > 0
- // Add merge strategy info only if we have children, with proper indentation
- if strategy, exists := node.Details["merge_strategy"]; exists && hasChildren {
- // Strategy should be indented as a detail of this node, before its children
- lines = append(lines, fmt.Sprintf("%s├── Strategy: %v", childPrefix, strategy))
- }
- return lines
- }
- // buildHierarchicalPlan creates a tree-like structure for the execution plan
- func (e *SQLEngine) buildHierarchicalPlan(plan *QueryExecutionPlan, err error) []string {
- var lines []string
- // Root node - Query type and strategy
- lines = append(lines, fmt.Sprintf("%s Query (%s)", plan.QueryType, plan.ExecutionStrategy))
- // Aggregations section (if present)
- if len(plan.Aggregations) > 0 {
- lines = append(lines, "├── Aggregations")
- for i, agg := range plan.Aggregations {
- if i == len(plan.Aggregations)-1 {
- lines = append(lines, fmt.Sprintf("│ └── %s", agg))
- } else {
- lines = append(lines, fmt.Sprintf("│ ├── %s", agg))
- }
- }
- }
- // Data Sources section
- if len(plan.DataSources) > 0 {
- hasMore := len(plan.OptimizationsUsed) > 0 || plan.TotalRowsProcessed > 0 || len(plan.Details) > 0 || err != nil
- if hasMore {
- lines = append(lines, "├── Data Sources")
- } else {
- lines = append(lines, "└── Data Sources")
- }
- for i, source := range plan.DataSources {
- prefix := "│ "
- if !hasMore && i == len(plan.DataSources)-1 {
- prefix = " "
- }
- if i == len(plan.DataSources)-1 {
- lines = append(lines, fmt.Sprintf("%s└── %s", prefix, e.formatDataSource(source)))
- } else {
- lines = append(lines, fmt.Sprintf("%s├── %s", prefix, e.formatDataSource(source)))
- }
- }
- }
- // Optimizations section
- if len(plan.OptimizationsUsed) > 0 {
- hasMore := plan.TotalRowsProcessed > 0 || len(plan.Details) > 0 || err != nil
- if hasMore {
- lines = append(lines, "├── Optimizations")
- } else {
- lines = append(lines, "└── Optimizations")
- }
- for i, opt := range plan.OptimizationsUsed {
- prefix := "│ "
- if !hasMore && i == len(plan.OptimizationsUsed)-1 {
- prefix = " "
- }
- if i == len(plan.OptimizationsUsed)-1 {
- lines = append(lines, fmt.Sprintf("%s└── %s", prefix, e.formatOptimization(opt)))
- } else {
- lines = append(lines, fmt.Sprintf("%s├── %s", prefix, e.formatOptimization(opt)))
- }
- }
- }
- // Check for data sources tree availability
- partitionPaths, hasPartitions := plan.Details["partition_paths"].([]string)
- parquetFiles, _ := plan.Details["parquet_files"].([]string)
- liveLogFiles, _ := plan.Details["live_log_files"].([]string)
- // Statistics section
- statisticsPresent := plan.PartitionsScanned > 0 || plan.ParquetFilesScanned > 0 ||
- plan.LiveLogFilesScanned > 0 || plan.TotalRowsProcessed > 0
- if statisticsPresent {
- // Check if there are sections after Statistics (Data Sources Tree, Details, Performance)
- hasDataSourcesTree := hasPartitions && len(partitionPaths) > 0
- hasMoreAfterStats := hasDataSourcesTree || len(plan.Details) > 0 || err != nil || true // Performance is always present
- if hasMoreAfterStats {
- lines = append(lines, "├── Statistics")
- } else {
- lines = append(lines, "└── Statistics")
- }
- stats := []string{}
- if plan.PartitionsScanned > 0 {
- stats = append(stats, fmt.Sprintf("Partitions Scanned: %d", plan.PartitionsScanned))
- }
- if plan.ParquetFilesScanned > 0 {
- stats = append(stats, fmt.Sprintf("Parquet Files: %d", plan.ParquetFilesScanned))
- }
- if plan.LiveLogFilesScanned > 0 {
- stats = append(stats, fmt.Sprintf("Live Log Files: %d", plan.LiveLogFilesScanned))
- }
- // Always show row statistics for aggregations, even if 0 (to show fast path efficiency)
- if resultsReturned, hasResults := plan.Details["results_returned"]; hasResults {
- stats = append(stats, fmt.Sprintf("Rows Scanned: %d", plan.TotalRowsProcessed))
- stats = append(stats, fmt.Sprintf("Results Returned: %v", resultsReturned))
- // Add fast path explanation when no rows were scanned
- if plan.TotalRowsProcessed == 0 {
- // Use the actual scan method from Details instead of hardcoding
- if scanMethod, exists := plan.Details["scan_method"].(string); exists {
- stats = append(stats, fmt.Sprintf("Scan Method: %s", scanMethod))
- } else {
- stats = append(stats, "Scan Method: Metadata Only")
- }
- }
- } else if plan.TotalRowsProcessed > 0 {
- stats = append(stats, fmt.Sprintf("Rows Processed: %d", plan.TotalRowsProcessed))
- }
- // Broker buffer information
- if plan.BrokerBufferQueried {
- stats = append(stats, fmt.Sprintf("Broker Buffer Queried: Yes (%d messages)", plan.BrokerBufferMessages))
- if plan.BufferStartIndex > 0 {
- stats = append(stats, fmt.Sprintf("Buffer Start Index: %d (deduplication enabled)", plan.BufferStartIndex))
- }
- }
- for i, stat := range stats {
- if hasMoreAfterStats {
- // More sections after Statistics, so use │ prefix
- if i == len(stats)-1 {
- lines = append(lines, fmt.Sprintf("│ └── %s", stat))
- } else {
- lines = append(lines, fmt.Sprintf("│ ├── %s", stat))
- }
- } else {
- // This is the last main section, so use space prefix for final item
- if i == len(stats)-1 {
- lines = append(lines, fmt.Sprintf(" └── %s", stat))
- } else {
- lines = append(lines, fmt.Sprintf(" ├── %s", stat))
- }
- }
- }
- }
- // Data Sources Tree section (if file paths are available)
- if hasPartitions && len(partitionPaths) > 0 {
- // Check if there are more sections after this
- hasMore := len(plan.Details) > 0 || err != nil
- if hasMore {
- lines = append(lines, "├── Data Sources Tree")
- } else {
- lines = append(lines, "├── Data Sources Tree") // Performance always comes after
- }
- // Build a tree structure for each partition
- for i, partition := range partitionPaths {
- isLastPartition := i == len(partitionPaths)-1
- // Show partition directory
- partitionPrefix := "├── "
- if isLastPartition {
- partitionPrefix = "└── "
- }
- lines = append(lines, fmt.Sprintf("│ %s%s/", partitionPrefix, partition))
- // Show parquet files in this partition
- partitionParquetFiles := make([]string, 0)
- for _, file := range parquetFiles {
- if strings.HasPrefix(file, partition+"/") {
- fileName := file[len(partition)+1:]
- partitionParquetFiles = append(partitionParquetFiles, fileName)
- }
- }
- // Show live log files in this partition
- partitionLiveLogFiles := make([]string, 0)
- for _, file := range liveLogFiles {
- if strings.HasPrefix(file, partition+"/") {
- fileName := file[len(partition)+1:]
- partitionLiveLogFiles = append(partitionLiveLogFiles, fileName)
- }
- }
- // Display files with proper tree formatting
- totalFiles := len(partitionParquetFiles) + len(partitionLiveLogFiles)
- fileIndex := 0
- // Display parquet files
- for _, fileName := range partitionParquetFiles {
- fileIndex++
- isLastFile := fileIndex == totalFiles && isLastPartition
- var filePrefix string
- if isLastPartition {
- if isLastFile {
- filePrefix = " └── "
- } else {
- filePrefix = " ├── "
- }
- } else {
- if isLastFile {
- filePrefix = "│ └── "
- } else {
- filePrefix = "│ ├── "
- }
- }
- lines = append(lines, fmt.Sprintf("│ %s%s (parquet)", filePrefix, fileName))
- }
- // Display live log files
- for _, fileName := range partitionLiveLogFiles {
- fileIndex++
- isLastFile := fileIndex == totalFiles && isLastPartition
- var filePrefix string
- if isLastPartition {
- if isLastFile {
- filePrefix = " └── "
- } else {
- filePrefix = " ├── "
- }
- } else {
- if isLastFile {
- filePrefix = "│ └── "
- } else {
- filePrefix = "│ ├── "
- }
- }
- lines = append(lines, fmt.Sprintf("│ %s%s (live log)", filePrefix, fileName))
- }
- }
- }
- // Details section
- // Filter out details that are shown elsewhere
- filteredDetails := make([]string, 0)
- for key, value := range plan.Details {
- // Skip keys that are already formatted and displayed in the Statistics section
- if key != "results_returned" && key != "partition_paths" && key != "parquet_files" && key != "live_log_files" {
- filteredDetails = append(filteredDetails, fmt.Sprintf("%s: %v", key, value))
- }
- }
- if len(filteredDetails) > 0 {
- // Performance is always present, so check if there are errors after Details
- hasMore := err != nil
- if hasMore {
- lines = append(lines, "├── Details")
- } else {
- lines = append(lines, "├── Details") // Performance always comes after
- }
- for i, detail := range filteredDetails {
- if i == len(filteredDetails)-1 {
- lines = append(lines, fmt.Sprintf("│ └── %s", detail))
- } else {
- lines = append(lines, fmt.Sprintf("│ ├── %s", detail))
- }
- }
- }
- // Performance section (always present)
- if err != nil {
- lines = append(lines, "├── Performance")
- lines = append(lines, fmt.Sprintf("│ └── Execution Time: %.3fms", plan.ExecutionTimeMs))
- lines = append(lines, "└── Error")
- lines = append(lines, fmt.Sprintf(" └── %s", err.Error()))
- } else {
- lines = append(lines, "└── Performance")
- lines = append(lines, fmt.Sprintf(" └── Execution Time: %.3fms", plan.ExecutionTimeMs))
- }
- return lines
- }
- // formatDataSource provides user-friendly names for data sources
- func (e *SQLEngine) formatDataSource(source string) string {
- switch source {
- case "parquet_stats":
- return "Parquet Statistics (fast path)"
- case "parquet_files":
- return "Parquet Files (full scan)"
- case "live_logs":
- return "Live Log Files"
- case "broker_buffer":
- return "Broker Buffer (real-time)"
- default:
- return source
- }
- }
- // buildExecutionTree creates a tree representation of the query execution plan
- func (e *SQLEngine) buildExecutionTree(plan *QueryExecutionPlan, stmt *SelectStatement) ExecutionNode {
- // Extract WHERE clause predicates for pushdown analysis
- var predicates []string
- if stmt.Where != nil {
- predicates = e.extractPredicateStrings(stmt.Where.Expr)
- }
- // Check if we have detailed file information
- partitionPaths, hasPartitions := plan.Details["partition_paths"].([]string)
- parquetFiles, hasParquetFiles := plan.Details["parquet_files"].([]string)
- liveLogFiles, hasLiveLogFiles := plan.Details["live_log_files"].([]string)
- if !hasPartitions || len(partitionPaths) == 0 {
- // Fallback: create simple structure without file details
- return &ScanOperationNode{
- ScanType: "hybrid_scan",
- Description: fmt.Sprintf("Hybrid Scan (%s)", plan.ExecutionStrategy),
- Predicates: predicates,
- Details: map[string]interface{}{
- "note": "File details not available",
- },
- }
- }
- // Build file source nodes
- var parquetNodes []ExecutionNode
- var liveLogNodes []ExecutionNode
- var brokerBufferNodes []ExecutionNode
- // Create parquet file nodes
- if hasParquetFiles {
- for _, filePath := range parquetFiles {
- operations := e.determineParquetOperations(plan, filePath)
- parquetNodes = append(parquetNodes, &FileSourceNode{
- FilePath: filePath,
- SourceType: "parquet",
- Predicates: predicates,
- Operations: operations,
- OptimizationHint: e.determineOptimizationHint(plan, "parquet"),
- Details: map[string]interface{}{
- "format": "parquet",
- },
- })
- }
- }
- // Create live log file nodes
- if hasLiveLogFiles {
- for _, filePath := range liveLogFiles {
- operations := e.determineLiveLogOperations(plan, filePath)
- liveLogNodes = append(liveLogNodes, &FileSourceNode{
- FilePath: filePath,
- SourceType: "live_log",
- Predicates: predicates,
- Operations: operations,
- OptimizationHint: e.determineOptimizationHint(plan, "live_log"),
- Details: map[string]interface{}{
- "format": "log_entry",
- },
- })
- }
- }
- // Create broker buffer node only if queried AND has unflushed messages
- if plan.BrokerBufferQueried && plan.BrokerBufferMessages > 0 {
- brokerBufferNodes = append(brokerBufferNodes, &FileSourceNode{
- FilePath: "broker_memory_buffer",
- SourceType: "broker_buffer",
- Predicates: predicates,
- Operations: []string{"memory_scan"},
- OptimizationHint: "real_time",
- Details: map[string]interface{}{
- "messages": plan.BrokerBufferMessages,
- "buffer_start_idx": plan.BufferStartIndex,
- },
- })
- }
- // Build the tree structure based on data sources
- var scanNodes []ExecutionNode
- // Add parquet scan node ONLY if there are actual parquet files
- if len(parquetNodes) > 0 {
- scanNodes = append(scanNodes, &ScanOperationNode{
- ScanType: "parquet_scan",
- Description: fmt.Sprintf("Parquet File Scan (%d files)", len(parquetNodes)),
- Predicates: predicates,
- Children: parquetNodes,
- Details: map[string]interface{}{
- "files_count": len(parquetNodes),
- "pushdown": "column_projection + predicate_filtering",
- },
- })
- }
- // Add live log scan node ONLY if there are actual live log files
- if len(liveLogNodes) > 0 {
- scanNodes = append(scanNodes, &ScanOperationNode{
- ScanType: "live_log_scan",
- Description: fmt.Sprintf("Live Log Scan (%d files)", len(liveLogNodes)),
- Predicates: predicates,
- Children: liveLogNodes,
- Details: map[string]interface{}{
- "files_count": len(liveLogNodes),
- "pushdown": "predicate_filtering",
- },
- })
- }
- // Add broker buffer scan node ONLY if buffer was actually queried
- if len(brokerBufferNodes) > 0 {
- scanNodes = append(scanNodes, &ScanOperationNode{
- ScanType: "broker_buffer_scan",
- Description: "Real-time Buffer Scan",
- Predicates: predicates,
- Children: brokerBufferNodes,
- Details: map[string]interface{}{
- "real_time": true,
- },
- })
- }
- // Debug: Check what we actually have
- totalFileNodes := len(parquetNodes) + len(liveLogNodes) + len(brokerBufferNodes)
- if totalFileNodes == 0 {
- // No actual files found, return simple fallback
- return &ScanOperationNode{
- ScanType: "hybrid_scan",
- Description: fmt.Sprintf("Hybrid Scan (%s)", plan.ExecutionStrategy),
- Predicates: predicates,
- Details: map[string]interface{}{
- "note": "No source files discovered",
- },
- }
- }
- // If no scan nodes, return a fallback structure
- if len(scanNodes) == 0 {
- return &ScanOperationNode{
- ScanType: "hybrid_scan",
- Description: fmt.Sprintf("Hybrid Scan (%s)", plan.ExecutionStrategy),
- Predicates: predicates,
- Details: map[string]interface{}{
- "note": "No file details available",
- },
- }
- }
- // If only one scan type, return it directly
- if len(scanNodes) == 1 {
- return scanNodes[0]
- }
- // Multiple scan types - need merge operation
- return &MergeOperationNode{
- OperationType: "chronological_merge",
- Description: "Chronological Merge (time-ordered)",
- Children: scanNodes,
- Details: map[string]interface{}{
- "merge_strategy": "timestamp_based",
- "sources_count": len(scanNodes),
- },
- }
- }
- // extractPredicateStrings extracts predicate descriptions from WHERE clause
- func (e *SQLEngine) extractPredicateStrings(expr ExprNode) []string {
- var predicates []string
- e.extractPredicateStringsRecursive(expr, &predicates)
- return predicates
- }
- func (e *SQLEngine) extractPredicateStringsRecursive(expr ExprNode, predicates *[]string) {
- switch exprType := expr.(type) {
- case *ComparisonExpr:
- *predicates = append(*predicates, fmt.Sprintf("%s %s %s",
- e.exprToString(exprType.Left), exprType.Operator, e.exprToString(exprType.Right)))
- case *IsNullExpr:
- *predicates = append(*predicates, fmt.Sprintf("%s IS NULL", e.exprToString(exprType.Expr)))
- case *IsNotNullExpr:
- *predicates = append(*predicates, fmt.Sprintf("%s IS NOT NULL", e.exprToString(exprType.Expr)))
- case *AndExpr:
- e.extractPredicateStringsRecursive(exprType.Left, predicates)
- e.extractPredicateStringsRecursive(exprType.Right, predicates)
- case *OrExpr:
- e.extractPredicateStringsRecursive(exprType.Left, predicates)
- e.extractPredicateStringsRecursive(exprType.Right, predicates)
- case *ParenExpr:
- e.extractPredicateStringsRecursive(exprType.Expr, predicates)
- }
- }
- func (e *SQLEngine) exprToString(expr ExprNode) string {
- switch exprType := expr.(type) {
- case *ColName:
- return exprType.Name.String()
- default:
- // For now, return a simplified representation
- return fmt.Sprintf("%T", expr)
- }
- }
- // determineParquetOperations determines what operations will be performed on parquet files
- func (e *SQLEngine) determineParquetOperations(plan *QueryExecutionPlan, filePath string) []string {
- var operations []string
- // Check for column projection
- if contains(plan.OptimizationsUsed, "column_projection") {
- operations = append(operations, "column_projection")
- }
- // Check for predicate pushdown
- if contains(plan.OptimizationsUsed, "predicate_pushdown") {
- operations = append(operations, "predicate_pushdown")
- }
- // Check for statistics usage
- if contains(plan.OptimizationsUsed, "parquet_statistics") || plan.ExecutionStrategy == "hybrid_fast_path" {
- operations = append(operations, "statistics_skip")
- } else {
- operations = append(operations, "row_group_scan")
- }
- if len(operations) == 0 {
- operations = append(operations, "full_scan")
- }
- return operations
- }
- // determineLiveLogOperations determines what operations will be performed on live log files
- func (e *SQLEngine) determineLiveLogOperations(plan *QueryExecutionPlan, filePath string) []string {
- var operations []string
- // Live logs typically require sequential scan
- operations = append(operations, "sequential_scan")
- // Check for predicate filtering
- if contains(plan.OptimizationsUsed, "predicate_pushdown") {
- operations = append(operations, "predicate_filtering")
- }
- return operations
- }
- // determineOptimizationHint determines the optimization hint for a data source
- func (e *SQLEngine) determineOptimizationHint(plan *QueryExecutionPlan, sourceType string) string {
- switch plan.ExecutionStrategy {
- case "hybrid_fast_path":
- if sourceType == "parquet" {
- return "statistics_only"
- }
- return "minimal_scan"
- case "full_scan":
- return "full_scan"
- case "column_projection":
- return "column_filter"
- default:
- return ""
- }
- }
- // Helper function to check if slice contains string
- func contains(slice []string, item string) bool {
- for _, s := range slice {
- if s == item {
- return true
- }
- }
- return false
- }
- // collectLiveLogFileNames collects live log file names from a partition directory
- func (e *SQLEngine) collectLiveLogFileNames(filerClient filer_pb.FilerClient, partitionPath string) ([]string, error) {
- var liveLogFiles []string
- err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- // List all files in partition directory
- request := &filer_pb.ListEntriesRequest{
- Directory: partitionPath,
- Prefix: "",
- StartFromFileName: "",
- InclusiveStartFrom: false,
- Limit: 10000, // reasonable limit
- }
- stream, err := client.ListEntries(context.Background(), request)
- if err != nil {
- return err
- }
- for {
- resp, err := stream.Recv()
- if err != nil {
- if err == io.EOF {
- break
- }
- return err
- }
- entry := resp.Entry
- if entry != nil && !entry.IsDirectory {
- // Check if this is a log file (not a parquet file)
- fileName := entry.Name
- if !strings.HasSuffix(fileName, ".parquet") && !strings.HasSuffix(fileName, ".metadata") {
- liveLogFiles = append(liveLogFiles, fileName)
- }
- }
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- return liveLogFiles, nil
- }
- // formatOptimization provides user-friendly names for optimizations
- func (e *SQLEngine) formatOptimization(opt string) string {
- switch opt {
- case "parquet_statistics":
- return "Parquet Statistics Usage"
- case "live_log_counting":
- return "Live Log Row Counting"
- case "deduplication":
- return "Duplicate Data Avoidance"
- case "predicate_pushdown":
- return "WHERE Clause Pushdown"
- case "column_statistics_pruning":
- return "Column Statistics File Pruning"
- case "column_projection":
- return "Column Selection"
- case "limit_pushdown":
- return "LIMIT Optimization"
- default:
- return opt
- }
- }
- // executeUseStatement handles USE database statements to switch current database context
- func (e *SQLEngine) executeUseStatement(ctx context.Context, stmt *UseStatement) (*QueryResult, error) {
- // Validate database name
- if stmt.Database == "" {
- err := fmt.Errorf("database name cannot be empty")
- return &QueryResult{Error: err}, err
- }
- // Set the current database in the catalog
- e.catalog.SetCurrentDatabase(stmt.Database)
- // Return success message
- result := &QueryResult{
- Columns: []string{"message"},
- Rows: [][]sqltypes.Value{
- {sqltypes.MakeString([]byte(fmt.Sprintf("Database changed to: %s", stmt.Database)))},
- },
- Error: nil,
- }
- return result, nil
- }
- // executeDDLStatement handles CREATE operations only
- // Note: ALTER TABLE and DROP TABLE are not supported to protect topic data
- func (e *SQLEngine) executeDDLStatement(ctx context.Context, stmt *DDLStatement) (*QueryResult, error) {
- switch stmt.Action {
- case CreateStr:
- return e.createTable(ctx, stmt)
- case AlterStr:
- err := fmt.Errorf("ALTER TABLE is not supported")
- return &QueryResult{Error: err}, err
- case DropStr:
- err := fmt.Errorf("DROP TABLE is not supported")
- return &QueryResult{Error: err}, err
- default:
- err := fmt.Errorf("unsupported DDL action: %s", stmt.Action)
- return &QueryResult{Error: err}, err
- }
- }
- // executeSelectStatementWithPlan handles SELECT queries with execution plan tracking
- func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) {
- // Initialize plan details once
- if plan != nil && plan.Details == nil {
- plan.Details = make(map[string]interface{})
- }
- // Parse aggregations to populate plan
- var aggregations []AggregationSpec
- hasAggregations := false
- selectAll := false
- for _, selectExpr := range stmt.SelectExprs {
- switch expr := selectExpr.(type) {
- case *StarExpr:
- selectAll = true
- case *AliasedExpr:
- switch col := expr.Expr.(type) {
- case *FuncExpr:
- // This is an aggregation function
- aggSpec, err := e.parseAggregationFunction(col, expr)
- if err != nil {
- return &QueryResult{Error: err}, err
- }
- if aggSpec != nil {
- aggregations = append(aggregations, *aggSpec)
- hasAggregations = true
- plan.Aggregations = append(plan.Aggregations, aggSpec.Function+"("+aggSpec.Column+")")
- }
- }
- }
- }
- // Execute the query (handle aggregations specially for plan tracking)
- var result *QueryResult
- var err error
- if hasAggregations {
- // Extract table information for aggregation execution
- var database, tableName string
- if len(stmt.From) == 1 {
- if table, ok := stmt.From[0].(*AliasedTableExpr); ok {
- if tableExpr, ok := table.Expr.(TableName); ok {
- tableName = tableExpr.Name.String()
- if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" {
- database = tableExpr.Qualifier.String()
- }
- }
- }
- }
- // Use current database if not specified
- if database == "" {
- database = e.catalog.currentDatabase
- if database == "" {
- database = "default"
- }
- }
- // Create hybrid scanner for aggregation execution
- var filerClient filer_pb.FilerClient
- if e.catalog.brokerClient != nil {
- filerClient, err = e.catalog.brokerClient.GetFilerClient()
- if err != nil {
- return &QueryResult{Error: err}, err
- }
- }
- hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e)
- if err != nil {
- return &QueryResult{Error: err}, err
- }
- // Execute aggregation query with plan tracking
- result, err = e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, plan)
- } else {
- // Regular SELECT query with plan tracking
- result, err = e.executeSelectStatementWithBrokerStats(ctx, stmt, plan)
- }
- if err == nil && result != nil {
- // Extract table name for use in execution strategy determination
- var tableName string
- if len(stmt.From) == 1 {
- if table, ok := stmt.From[0].(*AliasedTableExpr); ok {
- if tableExpr, ok := table.Expr.(TableName); ok {
- tableName = tableExpr.Name.String()
- }
- }
- }
- // Try to get topic information for partition count and row processing stats
- if tableName != "" {
- // Try to discover partitions for statistics
- if partitions, discoverErr := e.discoverTopicPartitions("test", tableName); discoverErr == nil {
- plan.PartitionsScanned = len(partitions)
- }
- // For aggregations, determine actual processing based on execution strategy
- if hasAggregations {
- plan.Details["results_returned"] = len(result.Rows)
- // Determine actual work done based on execution strategy
- if stmt.Where == nil {
- // Use the same logic as actual execution to determine if fast path was used
- var filerClient filer_pb.FilerClient
- if e.catalog.brokerClient != nil {
- filerClient, _ = e.catalog.brokerClient.GetFilerClient()
- }
- hybridScanner, scannerErr := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, "test", tableName, e)
- var canUseFastPath bool
- if scannerErr == nil {
- // Test if fast path can be used (same as actual execution)
- _, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations)
- canUseFastPath = canOptimize
- } else {
- // Fallback to simple check
- canUseFastPath = true
- for _, spec := range aggregations {
- if !e.canUseParquetStatsForAggregation(spec) {
- canUseFastPath = false
- break
- }
- }
- }
- if canUseFastPath {
- // Fast path: minimal scanning (only live logs that weren't converted)
- if actualScanCount, countErr := e.getActualRowsScannedForFastPath(ctx, "test", tableName); countErr == nil {
- plan.TotalRowsProcessed = actualScanCount
- } else {
- plan.TotalRowsProcessed = 0 // Parquet stats only, no scanning
- }
- } else {
- // Full scan: count all rows
- if actualRowCount, countErr := e.getTopicTotalRowCount(ctx, "test", tableName); countErr == nil {
- plan.TotalRowsProcessed = actualRowCount
- } else {
- plan.TotalRowsProcessed = int64(len(result.Rows))
- plan.Details["note"] = "scan_count_unavailable"
- }
- }
- } else {
- // With WHERE clause: full scan required
- if actualRowCount, countErr := e.getTopicTotalRowCount(ctx, "test", tableName); countErr == nil {
- plan.TotalRowsProcessed = actualRowCount
- } else {
- plan.TotalRowsProcessed = int64(len(result.Rows))
- plan.Details["note"] = "scan_count_unavailable"
- }
- }
- } else {
- // For non-aggregations, result count is meaningful
- plan.TotalRowsProcessed = int64(len(result.Rows))
- }
- }
- // Determine execution strategy based on query type (reuse fast path detection from above)
- if hasAggregations {
- // Skip execution strategy determination if plan was already populated by aggregation execution
- // This prevents overwriting the correctly built plan from BuildAggregationPlan
- if plan.ExecutionStrategy == "" {
- // For aggregations, determine if fast path conditions are met
- if stmt.Where == nil {
- // Reuse the same logic used above for row counting
- var canUseFastPath bool
- if tableName != "" {
- var filerClient filer_pb.FilerClient
- if e.catalog.brokerClient != nil {
- filerClient, _ = e.catalog.brokerClient.GetFilerClient()
- }
- if filerClient != nil {
- hybridScanner, scannerErr := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, "test", tableName, e)
- if scannerErr == nil {
- // Test if fast path can be used (same as actual execution)
- _, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations)
- canUseFastPath = canOptimize
- } else {
- canUseFastPath = false
- }
- } else {
- // Fallback check
- canUseFastPath = true
- for _, spec := range aggregations {
- if !e.canUseParquetStatsForAggregation(spec) {
- canUseFastPath = false
- break
- }
- }
- }
- } else {
- canUseFastPath = false
- }
- if canUseFastPath {
- plan.ExecutionStrategy = "hybrid_fast_path"
- plan.OptimizationsUsed = append(plan.OptimizationsUsed, "parquet_statistics", "live_log_counting", "deduplication")
- plan.DataSources = []string{"parquet_stats", "live_logs"}
- } else {
- plan.ExecutionStrategy = "full_scan"
- plan.DataSources = []string{"live_logs", "parquet_files"}
- }
- } else {
- plan.ExecutionStrategy = "full_scan"
- plan.DataSources = []string{"live_logs", "parquet_files"}
- plan.OptimizationsUsed = append(plan.OptimizationsUsed, "predicate_pushdown")
- }
- }
- } else {
- // For regular SELECT queries
- if selectAll {
- plan.ExecutionStrategy = "hybrid_scan"
- plan.DataSources = []string{"live_logs", "parquet_files"}
- } else {
- plan.ExecutionStrategy = "column_projection"
- plan.DataSources = []string{"live_logs", "parquet_files"}
- plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_projection")
- }
- }
- // Add WHERE clause information
- if stmt.Where != nil {
- // Only add predicate_pushdown if not already added
- alreadyHasPredicate := false
- for _, opt := range plan.OptimizationsUsed {
- if opt == "predicate_pushdown" {
- alreadyHasPredicate = true
- break
- }
- }
- if !alreadyHasPredicate {
- plan.OptimizationsUsed = append(plan.OptimizationsUsed, "predicate_pushdown")
- }
- plan.Details["where_clause"] = "present"
- }
- // Add LIMIT information
- if stmt.Limit != nil {
- plan.OptimizationsUsed = append(plan.OptimizationsUsed, "limit_pushdown")
- if stmt.Limit.Rowcount != nil {
- if limitExpr, ok := stmt.Limit.Rowcount.(*SQLVal); ok && limitExpr.Type == IntVal {
- plan.Details["limit"] = string(limitExpr.Val)
- }
- }
- }
- }
- // Build execution tree after all plan details are populated
- if err == nil && result != nil && plan != nil {
- plan.RootNode = e.buildExecutionTree(plan, stmt)
- }
- return result, err
- }
- // executeSelectStatement handles SELECT queries
- // Assumptions:
- // 1. Queries run against Parquet files in MQ topics
- // 2. Predicate pushdown is used for efficiency
- // 3. Cross-topic joins are supported via partition-aware execution
- func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *SelectStatement) (*QueryResult, error) {
- // Parse FROM clause to get table (topic) information
- if len(stmt.From) != 1 {
- err := fmt.Errorf("SELECT supports single table queries only")
- return &QueryResult{Error: err}, err
- }
- // Extract table reference
- var database, tableName string
- switch table := stmt.From[0].(type) {
- case *AliasedTableExpr:
- switch tableExpr := table.Expr.(type) {
- case TableName:
- tableName = tableExpr.Name.String()
- if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" {
- database = tableExpr.Qualifier.String()
- }
- default:
- err := fmt.Errorf("unsupported table expression: %T", tableExpr)
- return &QueryResult{Error: err}, err
- }
- default:
- err := fmt.Errorf("unsupported FROM clause: %T", table)
- return &QueryResult{Error: err}, err
- }
- // Use current database context if not specified
- if database == "" {
- database = e.catalog.GetCurrentDatabase()
- if database == "" {
- database = "default"
- }
- }
- // Auto-discover and register topic if not already in catalog
- if _, err := e.catalog.GetTableInfo(database, tableName); err != nil {
- // Topic not in catalog, try to discover and register it
- if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil {
- // Return error immediately for non-existent topics instead of falling back to sample data
- return &QueryResult{Error: regErr}, regErr
- }
- }
- // Create HybridMessageScanner for the topic (reads both live logs + Parquet files)
- // Get filerClient from broker connection (works with both real and mock brokers)
- var filerClient filer_pb.FilerClient
- var filerClientErr error
- filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient()
- if filerClientErr != nil {
- // Return error if filer client is not available for topic access
- return &QueryResult{Error: filerClientErr}, filerClientErr
- }
- hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e)
- if err != nil {
- // Handle quiet topics gracefully: topics exist but have no active schema/brokers
- if IsNoSchemaError(err) {
- // Return empty result for quiet topics (normal in production environments)
- return &QueryResult{
- Columns: []string{},
- Rows: [][]sqltypes.Value{},
- Database: database,
- Table: tableName,
- }, nil
- }
- // Return error for other access issues (truly non-existent topics, etc.)
- topicErr := fmt.Errorf("failed to access topic %s.%s: %v", database, tableName, err)
- return &QueryResult{Error: topicErr}, topicErr
- }
- // Parse SELECT columns and detect aggregation functions
- var columns []string
- var aggregations []AggregationSpec
- selectAll := false
- hasAggregations := false
- _ = hasAggregations // Used later in aggregation routing
- // Track required base columns for arithmetic expressions
- baseColumnsSet := make(map[string]bool)
- for _, selectExpr := range stmt.SelectExprs {
- switch expr := selectExpr.(type) {
- case *StarExpr:
- selectAll = true
- case *AliasedExpr:
- switch col := expr.Expr.(type) {
- case *ColName:
- colName := col.Name.String()
- // Check if this "column" is actually an arithmetic expression with functions
- if arithmeticExpr := e.parseColumnLevelCalculation(colName); arithmeticExpr != nil {
- columns = append(columns, e.getArithmeticExpressionAlias(arithmeticExpr))
- e.extractBaseColumns(arithmeticExpr, baseColumnsSet)
- } else {
- columns = append(columns, colName)
- baseColumnsSet[colName] = true
- }
- case *ArithmeticExpr:
- // Handle arithmetic expressions like id+user_id and string concatenation like name||suffix
- columns = append(columns, e.getArithmeticExpressionAlias(col))
- // Extract base columns needed for this arithmetic expression
- e.extractBaseColumns(col, baseColumnsSet)
- case *SQLVal:
- // Handle string/numeric literals like 'good', 123, etc.
- columns = append(columns, e.getSQLValAlias(col))
- case *FuncExpr:
- // Distinguish between aggregation functions and string functions
- funcName := strings.ToUpper(col.Name.String())
- if e.isAggregationFunction(funcName) {
- // Handle aggregation functions
- aggSpec, err := e.parseAggregationFunction(col, expr)
- if err != nil {
- return &QueryResult{Error: err}, err
- }
- aggregations = append(aggregations, *aggSpec)
- hasAggregations = true
- } else if e.isStringFunction(funcName) {
- // Handle string functions like UPPER, LENGTH, etc.
- columns = append(columns, e.getStringFunctionAlias(col))
- // Extract base columns needed for this string function
- e.extractBaseColumnsFromFunction(col, baseColumnsSet)
- } else if e.isDateTimeFunction(funcName) {
- // Handle datetime functions like CURRENT_DATE, NOW, EXTRACT, DATE_TRUNC
- columns = append(columns, e.getDateTimeFunctionAlias(col))
- // Extract base columns needed for this datetime function
- e.extractBaseColumnsFromFunction(col, baseColumnsSet)
- } else {
- return &QueryResult{Error: fmt.Errorf("unsupported function: %s", funcName)}, fmt.Errorf("unsupported function: %s", funcName)
- }
- default:
- err := fmt.Errorf("unsupported SELECT expression: %T", col)
- return &QueryResult{Error: err}, err
- }
- default:
- err := fmt.Errorf("unsupported SELECT expression: %T", expr)
- return &QueryResult{Error: err}, err
- }
- }
- // If we have aggregations, use aggregation query path
- if hasAggregations {
- return e.executeAggregationQuery(ctx, hybridScanner, aggregations, stmt)
- }
- // Parse WHERE clause for predicate pushdown
- var predicate func(*schema_pb.RecordValue) bool
- if stmt.Where != nil {
- predicate, err = e.buildPredicateWithContext(stmt.Where.Expr, stmt.SelectExprs)
- if err != nil {
- return &QueryResult{Error: err}, err
- }
- }
- // Parse LIMIT and OFFSET clauses
- // Use -1 to distinguish "no LIMIT" from "LIMIT 0"
- limit := -1
- offset := 0
- if stmt.Limit != nil && stmt.Limit.Rowcount != nil {
- switch limitExpr := stmt.Limit.Rowcount.(type) {
- case *SQLVal:
- if limitExpr.Type == IntVal {
- var parseErr error
- limit64, parseErr := strconv.ParseInt(string(limitExpr.Val), 10, 64)
- if parseErr != nil {
- return &QueryResult{Error: parseErr}, parseErr
- }
- if limit64 > math.MaxInt32 || limit64 < 0 {
- return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64)
- }
- limit = int(limit64)
- }
- }
- }
- // Parse OFFSET clause if present
- if stmt.Limit != nil && stmt.Limit.Offset != nil {
- switch offsetExpr := stmt.Limit.Offset.(type) {
- case *SQLVal:
- if offsetExpr.Type == IntVal {
- var parseErr error
- offset64, parseErr := strconv.ParseInt(string(offsetExpr.Val), 10, 64)
- if parseErr != nil {
- return &QueryResult{Error: parseErr}, parseErr
- }
- if offset64 > math.MaxInt32 || offset64 < 0 {
- return &QueryResult{Error: fmt.Errorf("OFFSET value %d is out of valid range", offset64)}, fmt.Errorf("OFFSET value %d is out of valid range", offset64)
- }
- offset = int(offset64)
- }
- }
- }
- // Build hybrid scan options
- // Extract time filters from WHERE clause to optimize scanning
- startTimeNs, stopTimeNs := int64(0), int64(0)
- if stmt.Where != nil {
- startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
- }
- hybridScanOptions := HybridScanOptions{
- StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons
- StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons
- Limit: limit,
- Offset: offset,
- Predicate: predicate,
- }
- if !selectAll {
- // Convert baseColumnsSet to slice for hybrid scan options
- baseColumns := make([]string, 0, len(baseColumnsSet))
- for columnName := range baseColumnsSet {
- baseColumns = append(baseColumns, columnName)
- }
- // Use base columns (not expression aliases) for data retrieval
- if len(baseColumns) > 0 {
- hybridScanOptions.Columns = baseColumns
- } else {
- // If no base columns found (shouldn't happen), use original columns
- hybridScanOptions.Columns = columns
- }
- }
- // Execute the hybrid scan (live logs + Parquet files)
- results, err := hybridScanner.Scan(ctx, hybridScanOptions)
- if err != nil {
- return &QueryResult{Error: err}, err
- }
- // Convert to SQL result format
- if selectAll {
- if len(columns) > 0 {
- // SELECT *, specific_columns - include both auto-discovered and explicit columns
- return hybridScanner.ConvertToSQLResultWithMixedColumns(results, columns), nil
- } else {
- // SELECT * only - let converter determine all columns (excludes system columns)
- columns = nil
- return hybridScanner.ConvertToSQLResult(results, columns), nil
- }
- }
- // Handle custom column expressions (including arithmetic)
- return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil
- }
- // executeSelectStatementWithBrokerStats handles SELECT queries with broker buffer statistics capture
- // This is used by EXPLAIN queries to capture complete data source information including broker memory
- func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) {
- // Parse FROM clause to get table (topic) information
- if len(stmt.From) != 1 {
- err := fmt.Errorf("SELECT supports single table queries only")
- return &QueryResult{Error: err}, err
- }
- // Extract table reference
- var database, tableName string
- switch table := stmt.From[0].(type) {
- case *AliasedTableExpr:
- switch tableExpr := table.Expr.(type) {
- case TableName:
- tableName = tableExpr.Name.String()
- if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" {
- database = tableExpr.Qualifier.String()
- }
- default:
- err := fmt.Errorf("unsupported table expression: %T", tableExpr)
- return &QueryResult{Error: err}, err
- }
- default:
- err := fmt.Errorf("unsupported FROM clause: %T", table)
- return &QueryResult{Error: err}, err
- }
- // Use current database context if not specified
- if database == "" {
- database = e.catalog.GetCurrentDatabase()
- if database == "" {
- database = "default"
- }
- }
- // Auto-discover and register topic if not already in catalog
- if _, err := e.catalog.GetTableInfo(database, tableName); err != nil {
- // Topic not in catalog, try to discover and register it
- if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil {
- // Return error immediately for non-existent topics instead of falling back to sample data
- return &QueryResult{Error: regErr}, regErr
- }
- }
- // Create HybridMessageScanner for the topic (reads both live logs + Parquet files)
- // Get filerClient from broker connection (works with both real and mock brokers)
- var filerClient filer_pb.FilerClient
- var filerClientErr error
- filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient()
- if filerClientErr != nil {
- // Return error if filer client is not available for topic access
- return &QueryResult{Error: filerClientErr}, filerClientErr
- }
- hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e)
- if err != nil {
- // Handle quiet topics gracefully: topics exist but have no active schema/brokers
- if IsNoSchemaError(err) {
- // Return empty result for quiet topics (normal in production environments)
- return &QueryResult{
- Columns: []string{},
- Rows: [][]sqltypes.Value{},
- Database: database,
- Table: tableName,
- }, nil
- }
- // Return error for other access issues (truly non-existent topics, etc.)
- topicErr := fmt.Errorf("failed to access topic %s.%s: %v", database, tableName, err)
- return &QueryResult{Error: topicErr}, topicErr
- }
- // Parse SELECT columns and detect aggregation functions
- var columns []string
- var aggregations []AggregationSpec
- selectAll := false
- hasAggregations := false
- _ = hasAggregations // Used later in aggregation routing
- // Track required base columns for arithmetic expressions
- baseColumnsSet := make(map[string]bool)
- for _, selectExpr := range stmt.SelectExprs {
- switch expr := selectExpr.(type) {
- case *StarExpr:
- selectAll = true
- case *AliasedExpr:
- switch col := expr.Expr.(type) {
- case *ColName:
- colName := col.Name.String()
- columns = append(columns, colName)
- baseColumnsSet[colName] = true
- case *ArithmeticExpr:
- // Handle arithmetic expressions like id+user_id and string concatenation like name||suffix
- columns = append(columns, e.getArithmeticExpressionAlias(col))
- // Extract base columns needed for this arithmetic expression
- e.extractBaseColumns(col, baseColumnsSet)
- case *SQLVal:
- // Handle string/numeric literals like 'good', 123, etc.
- columns = append(columns, e.getSQLValAlias(col))
- case *FuncExpr:
- // Distinguish between aggregation functions and string functions
- funcName := strings.ToUpper(col.Name.String())
- if e.isAggregationFunction(funcName) {
- // Handle aggregation functions
- aggSpec, err := e.parseAggregationFunction(col, expr)
- if err != nil {
- return &QueryResult{Error: err}, err
- }
- aggregations = append(aggregations, *aggSpec)
- hasAggregations = true
- } else if e.isStringFunction(funcName) {
- // Handle string functions like UPPER, LENGTH, etc.
- columns = append(columns, e.getStringFunctionAlias(col))
- // Extract base columns needed for this string function
- e.extractBaseColumnsFromFunction(col, baseColumnsSet)
- } else if e.isDateTimeFunction(funcName) {
- // Handle datetime functions like CURRENT_DATE, NOW, EXTRACT, DATE_TRUNC
- columns = append(columns, e.getDateTimeFunctionAlias(col))
- // Extract base columns needed for this datetime function
- e.extractBaseColumnsFromFunction(col, baseColumnsSet)
- } else {
- return &QueryResult{Error: fmt.Errorf("unsupported function: %s", funcName)}, fmt.Errorf("unsupported function: %s", funcName)
- }
- default:
- err := fmt.Errorf("unsupported SELECT expression: %T", col)
- return &QueryResult{Error: err}, err
- }
- default:
- err := fmt.Errorf("unsupported SELECT expression: %T", expr)
- return &QueryResult{Error: err}, err
- }
- }
- // If we have aggregations, use aggregation query path
- if hasAggregations {
- return e.executeAggregationQuery(ctx, hybridScanner, aggregations, stmt)
- }
- // Parse WHERE clause for predicate pushdown
- var predicate func(*schema_pb.RecordValue) bool
- if stmt.Where != nil {
- predicate, err = e.buildPredicateWithContext(stmt.Where.Expr, stmt.SelectExprs)
- if err != nil {
- return &QueryResult{Error: err}, err
- }
- }
- // Parse LIMIT and OFFSET clauses
- // Use -1 to distinguish "no LIMIT" from "LIMIT 0"
- limit := -1
- offset := 0
- if stmt.Limit != nil && stmt.Limit.Rowcount != nil {
- switch limitExpr := stmt.Limit.Rowcount.(type) {
- case *SQLVal:
- if limitExpr.Type == IntVal {
- var parseErr error
- limit64, parseErr := strconv.ParseInt(string(limitExpr.Val), 10, 64)
- if parseErr != nil {
- return &QueryResult{Error: parseErr}, parseErr
- }
- if limit64 > math.MaxInt32 || limit64 < 0 {
- return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64)
- }
- limit = int(limit64)
- }
- }
- }
- // Parse OFFSET clause if present
- if stmt.Limit != nil && stmt.Limit.Offset != nil {
- switch offsetExpr := stmt.Limit.Offset.(type) {
- case *SQLVal:
- if offsetExpr.Type == IntVal {
- var parseErr error
- offset64, parseErr := strconv.ParseInt(string(offsetExpr.Val), 10, 64)
- if parseErr != nil {
- return &QueryResult{Error: parseErr}, parseErr
- }
- if offset64 > math.MaxInt32 || offset64 < 0 {
- return &QueryResult{Error: fmt.Errorf("OFFSET value %d is out of valid range", offset64)}, fmt.Errorf("OFFSET value %d is out of valid range", offset64)
- }
- offset = int(offset64)
- }
- }
- }
- // Build hybrid scan options
- // Extract time filters from WHERE clause to optimize scanning
- startTimeNs, stopTimeNs := int64(0), int64(0)
- if stmt.Where != nil {
- startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
- }
- hybridScanOptions := HybridScanOptions{
- StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons
- StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons
- Limit: limit,
- Offset: offset,
- Predicate: predicate,
- }
- if !selectAll {
- // Convert baseColumnsSet to slice for hybrid scan options
- baseColumns := make([]string, 0, len(baseColumnsSet))
- for columnName := range baseColumnsSet {
- baseColumns = append(baseColumns, columnName)
- }
- // Use base columns (not expression aliases) for data retrieval
- if len(baseColumns) > 0 {
- hybridScanOptions.Columns = baseColumns
- } else {
- // If no base columns found (shouldn't happen), use original columns
- hybridScanOptions.Columns = columns
- }
- }
- // Execute the hybrid scan with stats capture for EXPLAIN
- var results []HybridScanResult
- if plan != nil {
- // EXPLAIN mode - capture broker buffer stats
- var stats *HybridScanStats
- results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions)
- if err != nil {
- return &QueryResult{Error: err}, err
- }
- // Populate plan with broker buffer information
- if stats != nil {
- plan.BrokerBufferQueried = stats.BrokerBufferQueried
- plan.BrokerBufferMessages = stats.BrokerBufferMessages
- plan.BufferStartIndex = stats.BufferStartIndex
- // Add broker_buffer to data sources if buffer was queried
- if stats.BrokerBufferQueried {
- // Check if broker_buffer is already in data sources
- hasBrokerBuffer := false
- for _, source := range plan.DataSources {
- if source == "broker_buffer" {
- hasBrokerBuffer = true
- break
- }
- }
- if !hasBrokerBuffer {
- plan.DataSources = append(plan.DataSources, "broker_buffer")
- }
- }
- }
- // Populate execution plan details with source file information for Data Sources Tree
- if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil {
- // Add partition paths to execution plan details
- plan.Details["partition_paths"] = partitions
- // Persist time filter details for downstream pruning/diagnostics
- plan.Details[PlanDetailStartTimeNs] = startTimeNs
- plan.Details[PlanDetailStopTimeNs] = stopTimeNs
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Time filters extracted - startTimeNs=%d stopTimeNs=%d\n", startTimeNs, stopTimeNs)
- }
- // Collect actual file information for each partition
- var parquetFiles []string
- var liveLogFiles []string
- parquetSources := make(map[string]bool)
- var parquetReadErrors []string
- var liveLogListErrors []string
- for _, partitionPath := range partitions {
- // Get parquet files for this partition
- if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil {
- // Prune files by time range with debug logging
- filteredStats := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs)
- // Further prune by column statistics from WHERE clause
- if stmt.Where != nil {
- beforeColumnPrune := len(filteredStats)
- filteredStats = e.pruneParquetFilesByColumnStats(ctx, filteredStats, stmt.Where.Expr)
- columnPrunedCount := beforeColumnPrune - len(filteredStats)
- if columnPrunedCount > 0 {
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath)
- }
- // Track column statistics optimization
- if !contains(plan.OptimizationsUsed, "column_statistics_pruning") {
- plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning")
- }
- }
- }
- for _, stats := range filteredStats {
- parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName))
- }
- } else {
- parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err))
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err)
- }
- }
- // Merge accurate parquet sources from metadata
- if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil {
- for src := range sources {
- parquetSources[src] = true
- }
- }
- // Get live log files for this partition
- if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil {
- for _, fileName := range liveFiles {
- // Exclude live log files that have been converted to parquet (deduplicated)
- if parquetSources[fileName] {
- continue
- }
- liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName))
- }
- } else {
- liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err))
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err)
- }
- }
- }
- if len(parquetFiles) > 0 {
- plan.Details["parquet_files"] = parquetFiles
- }
- if len(liveLogFiles) > 0 {
- plan.Details["live_log_files"] = liveLogFiles
- }
- if len(parquetReadErrors) > 0 {
- plan.Details["error_parquet_statistics"] = parquetReadErrors
- }
- if len(liveLogListErrors) > 0 {
- plan.Details["error_live_log_listing"] = liveLogListErrors
- }
- // Update scan statistics for execution plan display
- plan.PartitionsScanned = len(partitions)
- plan.ParquetFilesScanned = len(parquetFiles)
- plan.LiveLogFilesScanned = len(liveLogFiles)
- } else {
- // Handle partition discovery error
- plan.Details["error_partition_discovery"] = discoverErr.Error()
- }
- } else {
- // Normal mode - just get results
- results, err = hybridScanner.Scan(ctx, hybridScanOptions)
- if err != nil {
- return &QueryResult{Error: err}, err
- }
- }
- // Convert to SQL result format
- if selectAll {
- if len(columns) > 0 {
- // SELECT *, specific_columns - include both auto-discovered and explicit columns
- return hybridScanner.ConvertToSQLResultWithMixedColumns(results, columns), nil
- } else {
- // SELECT * only - let converter determine all columns (excludes system columns)
- columns = nil
- return hybridScanner.ConvertToSQLResult(results, columns), nil
- }
- }
- // Handle custom column expressions (including arithmetic)
- return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil
- }
- // extractTimeFilters extracts time range filters from WHERE clause for optimization
- // This allows push-down of time-based queries to improve scan performance
- // Returns (startTimeNs, stopTimeNs) where 0 means unbounded
- func (e *SQLEngine) extractTimeFilters(expr ExprNode) (int64, int64) {
- startTimeNs, stopTimeNs := int64(0), int64(0)
- // Recursively extract time filters from expression tree
- e.extractTimeFiltersRecursive(expr, &startTimeNs, &stopTimeNs)
- // Special case: if startTimeNs == stopTimeNs, treat it like an equality query
- // to avoid premature scan termination. The predicate will handle exact matching.
- if startTimeNs != 0 && startTimeNs == stopTimeNs {
- stopTimeNs = 0
- }
- return startTimeNs, stopTimeNs
- }
- // extractTimeFiltersWithValidation extracts time filters and validates that WHERE clause contains only time-based predicates
- // Returns (startTimeNs, stopTimeNs, onlyTimePredicates) where onlyTimePredicates indicates if fast path is safe
- func (e *SQLEngine) extractTimeFiltersWithValidation(expr ExprNode) (int64, int64, bool) {
- startTimeNs, stopTimeNs := int64(0), int64(0)
- onlyTimePredicates := true
- // Recursively extract time filters and validate predicates
- e.extractTimeFiltersWithValidationRecursive(expr, &startTimeNs, &stopTimeNs, &onlyTimePredicates)
- // Special case: if startTimeNs == stopTimeNs, treat it like an equality query
- if startTimeNs != 0 && startTimeNs == stopTimeNs {
- stopTimeNs = 0
- }
- return startTimeNs, stopTimeNs, onlyTimePredicates
- }
- // extractTimeFiltersRecursive recursively processes WHERE expressions to find time comparisons
- func (e *SQLEngine) extractTimeFiltersRecursive(expr ExprNode, startTimeNs, stopTimeNs *int64) {
- switch exprType := expr.(type) {
- case *ComparisonExpr:
- e.extractTimeFromComparison(exprType, startTimeNs, stopTimeNs)
- case *AndExpr:
- // For AND expressions, combine time filters (intersection)
- e.extractTimeFiltersRecursive(exprType.Left, startTimeNs, stopTimeNs)
- e.extractTimeFiltersRecursive(exprType.Right, startTimeNs, stopTimeNs)
- case *OrExpr:
- // For OR expressions, we can't easily optimize time ranges
- // Skip time filter extraction for OR clauses to avoid incorrect results
- return
- case *ParenExpr:
- // Unwrap parentheses and continue
- e.extractTimeFiltersRecursive(exprType.Expr, startTimeNs, stopTimeNs)
- }
- }
- // extractTimeFiltersWithValidationRecursive recursively processes WHERE expressions to find time comparisons and validate predicates
- func (e *SQLEngine) extractTimeFiltersWithValidationRecursive(expr ExprNode, startTimeNs, stopTimeNs *int64, onlyTimePredicates *bool) {
- switch exprType := expr.(type) {
- case *ComparisonExpr:
- // Check if this is a time-based comparison
- leftCol := e.getColumnName(exprType.Left)
- rightCol := e.getColumnName(exprType.Right)
- isTimeComparison := e.isTimestampColumn(leftCol) || e.isTimestampColumn(rightCol)
- if isTimeComparison {
- // Extract time filter from this comparison
- e.extractTimeFromComparison(exprType, startTimeNs, stopTimeNs)
- } else {
- // Non-time predicate found - fast path is not safe
- *onlyTimePredicates = false
- }
- case *AndExpr:
- // For AND expressions, both sides must be time-only for fast path to be safe
- e.extractTimeFiltersWithValidationRecursive(exprType.Left, startTimeNs, stopTimeNs, onlyTimePredicates)
- e.extractTimeFiltersWithValidationRecursive(exprType.Right, startTimeNs, stopTimeNs, onlyTimePredicates)
- case *OrExpr:
- // OR expressions are complex and not supported in fast path
- *onlyTimePredicates = false
- return
- case *ParenExpr:
- // Unwrap parentheses and continue
- e.extractTimeFiltersWithValidationRecursive(exprType.Expr, startTimeNs, stopTimeNs, onlyTimePredicates)
- default:
- // Unknown expression type - not safe for fast path
- *onlyTimePredicates = false
- }
- }
- // extractTimeFromComparison extracts time bounds from comparison expressions
- // Handles comparisons against timestamp columns (system columns and schema-defined timestamp types)
- func (e *SQLEngine) extractTimeFromComparison(comp *ComparisonExpr, startTimeNs, stopTimeNs *int64) {
- // Check if this is a time-related column comparison
- leftCol := e.getColumnName(comp.Left)
- rightCol := e.getColumnName(comp.Right)
- var valueExpr ExprNode
- var reversed bool
- // Determine which side is the time column (using schema types)
- if e.isTimestampColumn(leftCol) {
- valueExpr = comp.Right
- reversed = false
- } else if e.isTimestampColumn(rightCol) {
- valueExpr = comp.Left
- reversed = true
- } else {
- // Not a time comparison
- return
- }
- // Extract the time value
- timeValue := e.extractTimeValue(valueExpr)
- if timeValue == 0 {
- // Couldn't parse time value
- return
- }
- // Apply the comparison operator to determine time bounds
- operator := comp.Operator
- if reversed {
- // Reverse the operator if column and value are swapped
- operator = e.reverseOperator(operator)
- }
- switch operator {
- case GreaterThanStr: // timestamp > value
- if *startTimeNs == 0 || timeValue > *startTimeNs {
- *startTimeNs = timeValue
- }
- case GreaterEqualStr: // timestamp >= value
- if *startTimeNs == 0 || timeValue >= *startTimeNs {
- *startTimeNs = timeValue
- }
- case LessThanStr: // timestamp < value
- if *stopTimeNs == 0 || timeValue < *stopTimeNs {
- *stopTimeNs = timeValue
- }
- case LessEqualStr: // timestamp <= value
- if *stopTimeNs == 0 || timeValue <= *stopTimeNs {
- *stopTimeNs = timeValue
- }
- case EqualStr: // timestamp = value (point query)
- // For exact matches, we set startTimeNs slightly before the target
- // This works around a scan boundary bug where >= X starts after X instead of at X
- // The predicate function will handle exact matching
- *startTimeNs = timeValue - 1
- // Do NOT set stopTimeNs - let the predicate handle exact matching
- }
- }
- // isTimestampColumn checks if a column is a timestamp using schema type information
- func (e *SQLEngine) isTimestampColumn(columnName string) bool {
- if columnName == "" {
- return false
- }
- // System timestamp columns are always time columns
- if columnName == SW_COLUMN_NAME_TIMESTAMP || columnName == SW_DISPLAY_NAME_TIMESTAMP {
- return true
- }
- // For user-defined columns, check actual schema type information
- if e.catalog != nil {
- currentDB := e.catalog.GetCurrentDatabase()
- if currentDB == "" {
- currentDB = "default"
- }
- // Get current table context from query execution
- // Note: This is a limitation - we need table context here
- // In a full implementation, this would be passed from the query context
- tableInfo, err := e.getCurrentTableInfo(currentDB)
- if err == nil && tableInfo != nil {
- for _, col := range tableInfo.Columns {
- if strings.EqualFold(col.Name, columnName) {
- // Use actual SQL type to determine if this is a timestamp
- return e.isSQLTypeTimestamp(col.Type)
- }
- }
- }
- }
- // Only return true if we have explicit type information
- // No guessing based on column names
- return false
- }
- // getTimeFiltersFromPlan extracts time filter values from execution plan details
- func getTimeFiltersFromPlan(plan *QueryExecutionPlan) (startTimeNs, stopTimeNs int64) {
- if plan == nil || plan.Details == nil {
- return 0, 0
- }
- if startNsVal, ok := plan.Details[PlanDetailStartTimeNs]; ok {
- if startNs, ok2 := startNsVal.(int64); ok2 {
- startTimeNs = startNs
- }
- }
- if stopNsVal, ok := plan.Details[PlanDetailStopTimeNs]; ok {
- if stopNs, ok2 := stopNsVal.(int64); ok2 {
- stopTimeNs = stopNs
- }
- }
- return
- }
- // pruneParquetFilesByTime filters parquet files based on timestamp ranges, with optional debug logging
- func pruneParquetFilesByTime(ctx context.Context, parquetStats []*ParquetFileStats, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) []*ParquetFileStats {
- if startTimeNs == 0 && stopTimeNs == 0 {
- return parquetStats
- }
- debugEnabled := ctx != nil && isDebugMode(ctx)
- qStart := startTimeNs
- qStop := stopTimeNs
- if qStop == 0 {
- qStop = math.MaxInt64
- }
- n := 0
- for _, fs := range parquetStats {
- if debugEnabled {
- fmt.Printf("Debug: Checking parquet file %s for pruning\n", fs.FileName)
- }
- if minNs, maxNs, ok := hybridScanner.getTimestampRangeFromStats(fs); ok {
- if debugEnabled {
- fmt.Printf("Debug: Prune check parquet %s min=%d max=%d qStart=%d qStop=%d\n", fs.FileName, minNs, maxNs, qStart, qStop)
- }
- if qStop < minNs || (qStart != 0 && qStart > maxNs) {
- if debugEnabled {
- fmt.Printf("Debug: Skipping parquet file %s due to no time overlap\n", fs.FileName)
- }
- continue
- }
- } else if debugEnabled {
- fmt.Printf("Debug: No stats range available for parquet %s, cannot prune\n", fs.FileName)
- }
- parquetStats[n] = fs
- n++
- }
- return parquetStats[:n]
- }
- // pruneParquetFilesByColumnStats filters parquet files based on column statistics and WHERE predicates
- func (e *SQLEngine) pruneParquetFilesByColumnStats(ctx context.Context, parquetStats []*ParquetFileStats, whereExpr ExprNode) []*ParquetFileStats {
- if whereExpr == nil {
- return parquetStats
- }
- debugEnabled := ctx != nil && isDebugMode(ctx)
- n := 0
- for _, fs := range parquetStats {
- if e.canSkipParquetFile(ctx, fs, whereExpr) {
- if debugEnabled {
- fmt.Printf("Debug: Skipping parquet file %s due to column statistics pruning\n", fs.FileName)
- }
- continue
- }
- parquetStats[n] = fs
- n++
- }
- return parquetStats[:n]
- }
- // canSkipParquetFile determines if a parquet file can be skipped based on column statistics
- func (e *SQLEngine) canSkipParquetFile(ctx context.Context, fileStats *ParquetFileStats, whereExpr ExprNode) bool {
- switch expr := whereExpr.(type) {
- case *ComparisonExpr:
- return e.canSkipFileByComparison(ctx, fileStats, expr)
- case *AndExpr:
- // For AND: skip if ANY condition allows skipping (more aggressive pruning)
- return e.canSkipParquetFile(ctx, fileStats, expr.Left) || e.canSkipParquetFile(ctx, fileStats, expr.Right)
- case *OrExpr:
- // For OR: skip only if ALL conditions allow skipping (conservative)
- return e.canSkipParquetFile(ctx, fileStats, expr.Left) && e.canSkipParquetFile(ctx, fileStats, expr.Right)
- default:
- // Unknown expression type - don't skip
- return false
- }
- }
- // canSkipFileByComparison checks if a file can be skipped based on a comparison predicate
- func (e *SQLEngine) canSkipFileByComparison(ctx context.Context, fileStats *ParquetFileStats, expr *ComparisonExpr) bool {
- // Extract column name and comparison value
- var columnName string
- var compareSchemaValue *schema_pb.Value
- var operator string = expr.Operator
- // Determine which side is the column and which is the value
- if colRef, ok := expr.Left.(*ColName); ok {
- columnName = colRef.Name.String()
- if sqlVal, ok := expr.Right.(*SQLVal); ok {
- compareSchemaValue = e.convertSQLValToSchemaValue(sqlVal)
- } else {
- return false // Can't optimize complex expressions
- }
- } else if colRef, ok := expr.Right.(*ColName); ok {
- columnName = colRef.Name.String()
- if sqlVal, ok := expr.Left.(*SQLVal); ok {
- compareSchemaValue = e.convertSQLValToSchemaValue(sqlVal)
- // Flip operator for reversed comparison
- operator = e.flipOperator(operator)
- } else {
- return false
- }
- } else {
- return false // No column reference found
- }
- // Validate comparison value
- if compareSchemaValue == nil {
- return false
- }
- // Get column statistics
- colStats, exists := fileStats.ColumnStats[columnName]
- if !exists || colStats == nil {
- // Try case-insensitive lookup
- for colName, stats := range fileStats.ColumnStats {
- if strings.EqualFold(colName, columnName) {
- colStats = stats
- exists = true
- break
- }
- }
- }
- if !exists || colStats == nil || colStats.MinValue == nil || colStats.MaxValue == nil {
- return false // No statistics available
- }
- // Apply pruning logic based on operator
- switch operator {
- case ">":
- // Skip if max(column) <= compareValue
- return e.compareValues(colStats.MaxValue, compareSchemaValue) <= 0
- case ">=":
- // Skip if max(column) < compareValue
- return e.compareValues(colStats.MaxValue, compareSchemaValue) < 0
- case "<":
- // Skip if min(column) >= compareValue
- return e.compareValues(colStats.MinValue, compareSchemaValue) >= 0
- case "<=":
- // Skip if min(column) > compareValue
- return e.compareValues(colStats.MinValue, compareSchemaValue) > 0
- case "=":
- // Skip if compareValue is outside [min, max] range
- return e.compareValues(compareSchemaValue, colStats.MinValue) < 0 ||
- e.compareValues(compareSchemaValue, colStats.MaxValue) > 0
- case "!=", "<>":
- // Skip if min == max == compareValue (all values are the same and equal to compareValue)
- return e.compareValues(colStats.MinValue, colStats.MaxValue) == 0 &&
- e.compareValues(colStats.MinValue, compareSchemaValue) == 0
- default:
- return false // Unknown operator
- }
- }
- // flipOperator flips comparison operators when operands are swapped
- func (e *SQLEngine) flipOperator(op string) string {
- switch op {
- case ">":
- return "<"
- case ">=":
- return "<="
- case "<":
- return ">"
- case "<=":
- return ">="
- case "=", "!=", "<>":
- return op // These are symmetric
- default:
- return op
- }
- }
- // populatePlanFileDetails populates execution plan with detailed file information for partitions
- // Includes column statistics pruning optimization when WHERE clause is provided
- func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, partitions []string, stmt *SelectStatement) {
- debugEnabled := ctx != nil && isDebugMode(ctx)
- // Collect actual file information for each partition
- var parquetFiles []string
- var liveLogFiles []string
- parquetSources := make(map[string]bool)
- var parquetReadErrors []string
- var liveLogListErrors []string
- // Extract time filters from plan details
- startTimeNs, stopTimeNs := getTimeFiltersFromPlan(plan)
- for _, partitionPath := range partitions {
- // Get parquet files for this partition
- if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil {
- // Prune files by time range
- filteredStats := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs)
- // Further prune by column statistics from WHERE clause
- if stmt != nil && stmt.Where != nil {
- beforeColumnPrune := len(filteredStats)
- filteredStats = e.pruneParquetFilesByColumnStats(ctx, filteredStats, stmt.Where.Expr)
- columnPrunedCount := beforeColumnPrune - len(filteredStats)
- if columnPrunedCount > 0 {
- if debugEnabled {
- fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath)
- }
- // Track column statistics optimization
- if !contains(plan.OptimizationsUsed, "column_statistics_pruning") {
- plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning")
- }
- }
- }
- for _, stats := range filteredStats {
- parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName))
- }
- } else {
- parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err))
- if debugEnabled {
- fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err)
- }
- }
- // Merge accurate parquet sources from metadata
- if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil {
- for src := range sources {
- parquetSources[src] = true
- }
- }
- // Get live log files for this partition
- if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil {
- for _, fileName := range liveFiles {
- // Exclude live log files that have been converted to parquet (deduplicated)
- if parquetSources[fileName] {
- continue
- }
- liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName))
- }
- } else {
- liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err))
- if debugEnabled {
- fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err)
- }
- }
- }
- // Add file lists to plan details
- if len(parquetFiles) > 0 {
- plan.Details["parquet_files"] = parquetFiles
- }
- if len(liveLogFiles) > 0 {
- plan.Details["live_log_files"] = liveLogFiles
- }
- if len(parquetReadErrors) > 0 {
- plan.Details["error_parquet_statistics"] = parquetReadErrors
- }
- if len(liveLogListErrors) > 0 {
- plan.Details["error_live_log_listing"] = liveLogListErrors
- }
- }
- // isSQLTypeTimestamp checks if a SQL type string represents a timestamp type
- func (e *SQLEngine) isSQLTypeTimestamp(sqlType string) bool {
- upperType := strings.ToUpper(strings.TrimSpace(sqlType))
- // Handle type with precision/length specifications
- if idx := strings.Index(upperType, "("); idx != -1 {
- upperType = upperType[:idx]
- }
- switch upperType {
- case "TIMESTAMP", "DATETIME":
- return true
- case "BIGINT":
- // BIGINT could be a timestamp if it follows the pattern for timestamp storage
- // This is a heuristic - in a better system, we'd have semantic type information
- return false // Conservative approach - require explicit TIMESTAMP type
- default:
- return false
- }
- }
- // getCurrentTableInfo attempts to get table info for the current query context
- // This is a simplified implementation - ideally table context would be passed explicitly
- func (e *SQLEngine) getCurrentTableInfo(database string) (*TableInfo, error) {
- // This is a limitation of the current architecture
- // In practice, we'd need the table context from the current query
- // For now, return nil to fallback to naming conventions
- // TODO: Enhance architecture to pass table context through query execution
- return nil, fmt.Errorf("table context not available in current architecture")
- }
- // getColumnName extracts column name from expression (handles ColName types)
- func (e *SQLEngine) getColumnName(expr ExprNode) string {
- switch exprType := expr.(type) {
- case *ColName:
- return exprType.Name.String()
- }
- return ""
- }
- // resolveColumnAlias tries to resolve a column name that might be an alias
- func (e *SQLEngine) resolveColumnAlias(columnName string, selectExprs []SelectExpr) string {
- if selectExprs == nil {
- return columnName
- }
- // Check if this column name is actually an alias in the SELECT list
- for _, selectExpr := range selectExprs {
- if aliasedExpr, ok := selectExpr.(*AliasedExpr); ok && aliasedExpr != nil {
- // Check if the alias matches our column name
- if aliasedExpr.As != nil && !aliasedExpr.As.IsEmpty() && aliasedExpr.As.String() == columnName {
- // If the aliased expression is a column, return the actual column name
- if colExpr, ok := aliasedExpr.Expr.(*ColName); ok && colExpr != nil {
- return colExpr.Name.String()
- }
- }
- }
- }
- // If no alias found, return the original column name
- return columnName
- }
- // extractTimeValue parses time values from SQL expressions
- // Supports nanosecond timestamps, ISO dates, and relative times
- func (e *SQLEngine) extractTimeValue(expr ExprNode) int64 {
- switch exprType := expr.(type) {
- case *SQLVal:
- switch exprType.Type {
- case IntVal:
- // Parse as nanosecond timestamp
- if val, err := strconv.ParseInt(string(exprType.Val), 10, 64); err == nil {
- return val
- }
- case StrVal:
- // Parse as ISO date or other string formats
- timeStr := string(exprType.Val)
- // Try parsing as RFC3339 (ISO 8601)
- if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
- return t.UnixNano()
- }
- // Try parsing as RFC3339 with nanoseconds
- if t, err := time.Parse(time.RFC3339Nano, timeStr); err == nil {
- return t.UnixNano()
- }
- // Try parsing as date only (YYYY-MM-DD)
- if t, err := time.Parse("2006-01-02", timeStr); err == nil {
- return t.UnixNano()
- }
- // Try parsing as datetime (YYYY-MM-DD HH:MM:SS)
- if t, err := time.Parse("2006-01-02 15:04:05", timeStr); err == nil {
- return t.UnixNano()
- }
- }
- }
- return 0 // Couldn't parse
- }
- // reverseOperator reverses comparison operators when column and value are swapped
- func (e *SQLEngine) reverseOperator(op string) string {
- switch op {
- case GreaterThanStr:
- return LessThanStr
- case GreaterEqualStr:
- return LessEqualStr
- case LessThanStr:
- return GreaterThanStr
- case LessEqualStr:
- return GreaterEqualStr
- case EqualStr:
- return EqualStr
- case NotEqualStr:
- return NotEqualStr
- default:
- return op
- }
- }
- // buildPredicate creates a predicate function from a WHERE clause expression
- // This is a simplified implementation - a full implementation would be much more complex
- func (e *SQLEngine) buildPredicate(expr ExprNode) (func(*schema_pb.RecordValue) bool, error) {
- return e.buildPredicateWithContext(expr, nil)
- }
- // buildPredicateWithContext creates a predicate function with SELECT context for alias resolution
- func (e *SQLEngine) buildPredicateWithContext(expr ExprNode, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) {
- switch exprType := expr.(type) {
- case *ComparisonExpr:
- return e.buildComparisonPredicateWithContext(exprType, selectExprs)
- case *BetweenExpr:
- return e.buildBetweenPredicateWithContext(exprType, selectExprs)
- case *IsNullExpr:
- return e.buildIsNullPredicateWithContext(exprType, selectExprs)
- case *IsNotNullExpr:
- return e.buildIsNotNullPredicateWithContext(exprType, selectExprs)
- case *AndExpr:
- leftPred, err := e.buildPredicateWithContext(exprType.Left, selectExprs)
- if err != nil {
- return nil, err
- }
- rightPred, err := e.buildPredicateWithContext(exprType.Right, selectExprs)
- if err != nil {
- return nil, err
- }
- return func(record *schema_pb.RecordValue) bool {
- return leftPred(record) && rightPred(record)
- }, nil
- case *OrExpr:
- leftPred, err := e.buildPredicateWithContext(exprType.Left, selectExprs)
- if err != nil {
- return nil, err
- }
- rightPred, err := e.buildPredicateWithContext(exprType.Right, selectExprs)
- if err != nil {
- return nil, err
- }
- return func(record *schema_pb.RecordValue) bool {
- return leftPred(record) || rightPred(record)
- }, nil
- default:
- return nil, fmt.Errorf("unsupported WHERE expression: %T", expr)
- }
- }
- // buildComparisonPredicateWithContext creates a predicate for comparison operations with alias support
- func (e *SQLEngine) buildComparisonPredicateWithContext(expr *ComparisonExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) {
- var columnName string
- var compareValue interface{}
- var operator string
- // Check if column is on the left side (normal case: column > value)
- if colName, ok := expr.Left.(*ColName); ok {
- rawColumnName := colName.Name.String()
- // Resolve potential alias to actual column name
- columnName = e.resolveColumnAlias(rawColumnName, selectExprs)
- // Map display names to internal names for system columns
- columnName = e.getSystemColumnInternalName(columnName)
- operator = expr.Operator
- // Extract comparison value from right side
- val, err := e.extractComparisonValue(expr.Right)
- if err != nil {
- return nil, fmt.Errorf("failed to extract right-side value: %v", err)
- }
- compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Right)
- } else if colName, ok := expr.Right.(*ColName); ok {
- // Column is on the right side (reversed case: value < column)
- rawColumnName := colName.Name.String()
- // Resolve potential alias to actual column name
- columnName = e.resolveColumnAlias(rawColumnName, selectExprs)
- // Map display names to internal names for system columns
- columnName = e.getSystemColumnInternalName(columnName)
- // Reverse the operator when column is on right side
- operator = e.reverseOperator(expr.Operator)
- // Extract comparison value from left side
- val, err := e.extractComparisonValue(expr.Left)
- if err != nil {
- return nil, fmt.Errorf("failed to extract left-side value: %v", err)
- }
- compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Left)
- } else {
- // Handle literal-only comparisons like 1 = 0, 'a' = 'b', etc.
- leftVal, leftErr := e.extractComparisonValue(expr.Left)
- rightVal, rightErr := e.extractComparisonValue(expr.Right)
- if leftErr != nil || rightErr != nil {
- return nil, fmt.Errorf("no column name found in comparison expression, left: %T, right: %T", expr.Left, expr.Right)
- }
- // Evaluate the literal comparison once
- result := e.compareLiteralValues(leftVal, rightVal, expr.Operator)
- // Return a constant predicate
- return func(record *schema_pb.RecordValue) bool {
- return result
- }, nil
- }
- // Return the predicate function
- return func(record *schema_pb.RecordValue) bool {
- fieldValue, exists := record.Fields[columnName]
- if !exists {
- return false // Column doesn't exist in record
- }
- // Use the comparison evaluation function
- return e.evaluateComparison(fieldValue, operator, compareValue)
- }, nil
- }
- // buildBetweenPredicateWithContext creates a predicate for BETWEEN operations
- func (e *SQLEngine) buildBetweenPredicateWithContext(expr *BetweenExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) {
- var columnName string
- var fromValue, toValue interface{}
- // Check if left side is a column name
- if colName, ok := expr.Left.(*ColName); ok {
- rawColumnName := colName.Name.String()
- // Resolve potential alias to actual column name
- columnName = e.resolveColumnAlias(rawColumnName, selectExprs)
- // Map display names to internal names for system columns
- columnName = e.getSystemColumnInternalName(columnName)
- // Extract FROM value
- fromVal, err := e.extractComparisonValue(expr.From)
- if err != nil {
- return nil, fmt.Errorf("failed to extract BETWEEN from value: %v", err)
- }
- fromValue = e.convertValueForTimestampColumn(columnName, fromVal, expr.From)
- // Extract TO value
- toVal, err := e.extractComparisonValue(expr.To)
- if err != nil {
- return nil, fmt.Errorf("failed to extract BETWEEN to value: %v", err)
- }
- toValue = e.convertValueForTimestampColumn(columnName, toVal, expr.To)
- } else {
- return nil, fmt.Errorf("BETWEEN left operand must be a column name, got: %T", expr.Left)
- }
- // Return the predicate function
- return func(record *schema_pb.RecordValue) bool {
- fieldValue, exists := record.Fields[columnName]
- if !exists {
- return false
- }
- // Evaluate: fieldValue >= fromValue AND fieldValue <= toValue
- greaterThanOrEqualFrom := e.evaluateComparison(fieldValue, ">=", fromValue)
- lessThanOrEqualTo := e.evaluateComparison(fieldValue, "<=", toValue)
- result := greaterThanOrEqualFrom && lessThanOrEqualTo
- // Handle NOT BETWEEN
- if expr.Not {
- result = !result
- }
- return result
- }, nil
- }
- // buildIsNullPredicateWithContext creates a predicate for IS NULL operations
- func (e *SQLEngine) buildIsNullPredicateWithContext(expr *IsNullExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) {
- // Check if the expression is a column name
- if colName, ok := expr.Expr.(*ColName); ok {
- rawColumnName := colName.Name.String()
- // Resolve potential alias to actual column name
- columnName := e.resolveColumnAlias(rawColumnName, selectExprs)
- // Map display names to internal names for system columns
- columnName = e.getSystemColumnInternalName(columnName)
- // Return the predicate function
- return func(record *schema_pb.RecordValue) bool {
- // Check if field exists and if it's null or missing
- fieldValue, exists := record.Fields[columnName]
- if !exists {
- return true // Field doesn't exist = NULL
- }
- // Check if the field value itself is null/empty
- return e.isValueNull(fieldValue)
- }, nil
- } else {
- return nil, fmt.Errorf("IS NULL left operand must be a column name, got: %T", expr.Expr)
- }
- }
- // buildIsNotNullPredicateWithContext creates a predicate for IS NOT NULL operations
- func (e *SQLEngine) buildIsNotNullPredicateWithContext(expr *IsNotNullExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) {
- // Check if the expression is a column name
- if colName, ok := expr.Expr.(*ColName); ok {
- rawColumnName := colName.Name.String()
- // Resolve potential alias to actual column name
- columnName := e.resolveColumnAlias(rawColumnName, selectExprs)
- // Map display names to internal names for system columns
- columnName = e.getSystemColumnInternalName(columnName)
- // Return the predicate function
- return func(record *schema_pb.RecordValue) bool {
- // Check if field exists and if it's not null
- fieldValue, exists := record.Fields[columnName]
- if !exists {
- return false // Field doesn't exist = NULL, so NOT NULL is false
- }
- // Check if the field value itself is not null/empty
- return !e.isValueNull(fieldValue)
- }, nil
- } else {
- return nil, fmt.Errorf("IS NOT NULL left operand must be a column name, got: %T", expr.Expr)
- }
- }
- // isValueNull checks if a schema_pb.Value is null or represents a null value
- func (e *SQLEngine) isValueNull(value *schema_pb.Value) bool {
- if value == nil {
- return true
- }
- // Check the Kind field to see if it represents a null value
- if value.Kind == nil {
- return true
- }
- // For different value types, check if they represent null/empty values
- switch kind := value.Kind.(type) {
- case *schema_pb.Value_StringValue:
- // Empty string could be considered null depending on semantics
- // For now, treat empty string as not null (SQL standard behavior)
- return false
- case *schema_pb.Value_BoolValue:
- return false // Boolean values are never null
- case *schema_pb.Value_Int32Value, *schema_pb.Value_Int64Value:
- return false // Integer values are never null
- case *schema_pb.Value_FloatValue, *schema_pb.Value_DoubleValue:
- return false // Numeric values are never null
- case *schema_pb.Value_BytesValue:
- // Bytes could be null if empty, but for now treat as not null
- return false
- case *schema_pb.Value_TimestampValue:
- // Check if timestamp is zero/uninitialized
- return kind.TimestampValue == nil
- case *schema_pb.Value_DateValue:
- return kind.DateValue == nil
- case *schema_pb.Value_TimeValue:
- return kind.TimeValue == nil
- default:
- // Unknown type, consider it null to be safe
- return true
- }
- }
- // extractComparisonValue extracts the comparison value from a SQL expression
- func (e *SQLEngine) extractComparisonValue(expr ExprNode) (interface{}, error) {
- switch val := expr.(type) {
- case *SQLVal:
- switch val.Type {
- case IntVal:
- intVal, err := strconv.ParseInt(string(val.Val), 10, 64)
- if err != nil {
- return nil, err
- }
- return intVal, nil
- case StrVal:
- return string(val.Val), nil
- case FloatVal:
- floatVal, err := strconv.ParseFloat(string(val.Val), 64)
- if err != nil {
- return nil, err
- }
- return floatVal, nil
- default:
- return nil, fmt.Errorf("unsupported SQL value type: %v", val.Type)
- }
- case *ArithmeticExpr:
- // Handle arithmetic expressions like CURRENT_TIMESTAMP - INTERVAL '1 hour'
- return e.evaluateArithmeticExpressionForComparison(val)
- case *FuncExpr:
- // Handle function calls like NOW(), CURRENT_TIMESTAMP
- return e.evaluateFunctionExpressionForComparison(val)
- case *IntervalExpr:
- // Handle standalone INTERVAL expressions
- nanos, err := e.evaluateInterval(val.Value)
- if err != nil {
- return nil, err
- }
- return nanos, nil
- case ValTuple:
- // Handle IN expressions with multiple values: column IN (value1, value2, value3)
- var inValues []interface{}
- for _, tupleVal := range val {
- switch v := tupleVal.(type) {
- case *SQLVal:
- switch v.Type {
- case IntVal:
- intVal, err := strconv.ParseInt(string(v.Val), 10, 64)
- if err != nil {
- return nil, err
- }
- inValues = append(inValues, intVal)
- case StrVal:
- inValues = append(inValues, string(v.Val))
- case FloatVal:
- floatVal, err := strconv.ParseFloat(string(v.Val), 64)
- if err != nil {
- return nil, err
- }
- inValues = append(inValues, floatVal)
- }
- }
- }
- return inValues, nil
- default:
- return nil, fmt.Errorf("unsupported comparison value type: %T", expr)
- }
- }
- // evaluateArithmeticExpressionForComparison evaluates an arithmetic expression for WHERE clause comparisons
- func (e *SQLEngine) evaluateArithmeticExpressionForComparison(expr *ArithmeticExpr) (interface{}, error) {
- // Check if this is timestamp arithmetic with intervals
- if e.isTimestampArithmetic(expr.Left, expr.Right) && (expr.Operator == "+" || expr.Operator == "-") {
- // Evaluate timestamp arithmetic and return the result as nanoseconds
- result, err := e.evaluateTimestampArithmetic(expr.Left, expr.Right, expr.Operator)
- if err != nil {
- return nil, err
- }
- // Extract the timestamp value as nanoseconds for comparison
- if result.Kind != nil {
- switch resultKind := result.Kind.(type) {
- case *schema_pb.Value_Int64Value:
- return resultKind.Int64Value, nil
- case *schema_pb.Value_StringValue:
- // If it's a formatted timestamp string, parse it back to nanoseconds
- if timestamp, err := time.Parse("2006-01-02T15:04:05.000000000Z", resultKind.StringValue); err == nil {
- return timestamp.UnixNano(), nil
- }
- return nil, fmt.Errorf("could not parse timestamp string: %s", resultKind.StringValue)
- }
- }
- return nil, fmt.Errorf("invalid timestamp arithmetic result")
- }
- // For other arithmetic operations, we'd need to evaluate them differently
- // For now, return an error for unsupported arithmetic
- return nil, fmt.Errorf("unsupported arithmetic expression in WHERE clause: %s", expr.Operator)
- }
- // evaluateFunctionExpressionForComparison evaluates a function expression for WHERE clause comparisons
- func (e *SQLEngine) evaluateFunctionExpressionForComparison(expr *FuncExpr) (interface{}, error) {
- funcName := strings.ToUpper(expr.Name.String())
- switch funcName {
- case "NOW", "CURRENT_TIMESTAMP":
- result, err := e.Now()
- if err != nil {
- return nil, err
- }
- // Return as nanoseconds for comparison
- if result.Kind != nil {
- if resultKind, ok := result.Kind.(*schema_pb.Value_TimestampValue); ok {
- // Convert microseconds to nanoseconds
- return resultKind.TimestampValue.TimestampMicros * 1000, nil
- }
- }
- return nil, fmt.Errorf("invalid NOW() result: expected TimestampValue, got %T", result.Kind)
- case "CURRENT_DATE":
- result, err := e.CurrentDate()
- if err != nil {
- return nil, err
- }
- // Convert date to nanoseconds (start of day)
- if result.Kind != nil {
- if resultKind, ok := result.Kind.(*schema_pb.Value_StringValue); ok {
- if date, err := time.Parse("2006-01-02", resultKind.StringValue); err == nil {
- return date.UnixNano(), nil
- }
- }
- }
- return nil, fmt.Errorf("invalid CURRENT_DATE result")
- case "CURRENT_TIME":
- result, err := e.CurrentTime()
- if err != nil {
- return nil, err
- }
- // For time comparison, we might need special handling
- // For now, just return the string value
- if result.Kind != nil {
- if resultKind, ok := result.Kind.(*schema_pb.Value_StringValue); ok {
- return resultKind.StringValue, nil
- }
- }
- return nil, fmt.Errorf("invalid CURRENT_TIME result")
- default:
- return nil, fmt.Errorf("unsupported function in WHERE clause: %s", funcName)
- }
- }
- // evaluateComparison performs the actual comparison
- func (e *SQLEngine) evaluateComparison(fieldValue *schema_pb.Value, operator string, compareValue interface{}) bool {
- // This is a simplified implementation
- // A full implementation would handle type coercion and all comparison operators
- switch operator {
- case "=":
- return e.valuesEqual(fieldValue, compareValue)
- case "<":
- return e.valueLessThan(fieldValue, compareValue)
- case ">":
- return e.valueGreaterThan(fieldValue, compareValue)
- case "<=":
- return e.valuesEqual(fieldValue, compareValue) || e.valueLessThan(fieldValue, compareValue)
- case ">=":
- return e.valuesEqual(fieldValue, compareValue) || e.valueGreaterThan(fieldValue, compareValue)
- case "!=", "<>":
- return !e.valuesEqual(fieldValue, compareValue)
- case "LIKE", "like":
- return e.valueLike(fieldValue, compareValue)
- case "IN", "in":
- return e.valueIn(fieldValue, compareValue)
- default:
- return false
- }
- }
- // Helper functions for value comparison with proper type coercion
- func (e *SQLEngine) valuesEqual(fieldValue *schema_pb.Value, compareValue interface{}) bool {
- // Handle string comparisons first
- if strField, ok := fieldValue.Kind.(*schema_pb.Value_StringValue); ok {
- if strVal, ok := compareValue.(string); ok {
- return strField.StringValue == strVal
- }
- return false
- }
- // Handle boolean comparisons
- if boolField, ok := fieldValue.Kind.(*schema_pb.Value_BoolValue); ok {
- if boolVal, ok := compareValue.(bool); ok {
- return boolField.BoolValue == boolVal
- }
- return false
- }
- // Handle logical type comparisons
- if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok {
- if timestampVal, ok := compareValue.(int64); ok {
- return timestampField.TimestampValue.TimestampMicros == timestampVal
- }
- return false
- }
- if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok {
- if dateVal, ok := compareValue.(int32); ok {
- return dateField.DateValue.DaysSinceEpoch == dateVal
- }
- return false
- }
- // Handle DecimalValue comparison (convert to string for comparison)
- if decimalField, ok := fieldValue.Kind.(*schema_pb.Value_DecimalValue); ok {
- if decimalStr, ok := compareValue.(string); ok {
- // Convert decimal bytes back to string for comparison
- decimalValue := e.decimalToString(decimalField.DecimalValue)
- return decimalValue == decimalStr
- }
- return false
- }
- if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok {
- if timeVal, ok := compareValue.(int64); ok {
- return timeField.TimeValue.TimeMicros == timeVal
- }
- return false
- }
- // Handle direct int64 comparisons for timestamp precision (before float64 conversion)
- if int64Field, ok := fieldValue.Kind.(*schema_pb.Value_Int64Value); ok {
- if int64Val, ok := compareValue.(int64); ok {
- return int64Field.Int64Value == int64Val
- }
- if intVal, ok := compareValue.(int); ok {
- return int64Field.Int64Value == int64(intVal)
- }
- }
- // Handle direct int32 comparisons
- if int32Field, ok := fieldValue.Kind.(*schema_pb.Value_Int32Value); ok {
- if int32Val, ok := compareValue.(int32); ok {
- return int32Field.Int32Value == int32Val
- }
- if intVal, ok := compareValue.(int); ok {
- return int32Field.Int32Value == int32(intVal)
- }
- if int64Val, ok := compareValue.(int64); ok && int64Val >= math.MinInt32 && int64Val <= math.MaxInt32 {
- return int32Field.Int32Value == int32(int64Val)
- }
- }
- // Handle numeric comparisons with type coercion (fallback for other numeric types)
- fieldNum := e.convertToNumber(fieldValue)
- compareNum := e.convertCompareValueToNumber(compareValue)
- if fieldNum != nil && compareNum != nil {
- return *fieldNum == *compareNum
- }
- return false
- }
- // convertCompareValueToNumber converts compare values from SQL queries to float64
- func (e *SQLEngine) convertCompareValueToNumber(compareValue interface{}) *float64 {
- switch v := compareValue.(type) {
- case int:
- result := float64(v)
- return &result
- case int32:
- result := float64(v)
- return &result
- case int64:
- result := float64(v)
- return &result
- case float32:
- result := float64(v)
- return &result
- case float64:
- return &v
- case string:
- // Try to parse string as number for flexible comparisons
- if parsed, err := strconv.ParseFloat(v, 64); err == nil {
- return &parsed
- }
- }
- return nil
- }
- // decimalToString converts a DecimalValue back to string representation
- func (e *SQLEngine) decimalToString(decimalValue *schema_pb.DecimalValue) string {
- if decimalValue == nil || decimalValue.Value == nil {
- return "0"
- }
- // Convert bytes back to big.Int
- intValue := new(big.Int).SetBytes(decimalValue.Value)
- // Convert to string with proper decimal placement
- str := intValue.String()
- // Handle decimal placement based on scale
- scale := int(decimalValue.Scale)
- if scale > 0 && len(str) > scale {
- // Insert decimal point
- decimalPos := len(str) - scale
- return str[:decimalPos] + "." + str[decimalPos:]
- }
- return str
- }
- func (e *SQLEngine) valueLessThan(fieldValue *schema_pb.Value, compareValue interface{}) bool {
- // Handle string comparisons lexicographically
- if strField, ok := fieldValue.Kind.(*schema_pb.Value_StringValue); ok {
- if strVal, ok := compareValue.(string); ok {
- return strField.StringValue < strVal
- }
- return false
- }
- // Handle logical type comparisons
- if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok {
- if timestampVal, ok := compareValue.(int64); ok {
- return timestampField.TimestampValue.TimestampMicros < timestampVal
- }
- return false
- }
- if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok {
- if dateVal, ok := compareValue.(int32); ok {
- return dateField.DateValue.DaysSinceEpoch < dateVal
- }
- return false
- }
- if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok {
- if timeVal, ok := compareValue.(int64); ok {
- return timeField.TimeValue.TimeMicros < timeVal
- }
- return false
- }
- // Handle direct int64 comparisons for timestamp precision (before float64 conversion)
- if int64Field, ok := fieldValue.Kind.(*schema_pb.Value_Int64Value); ok {
- if int64Val, ok := compareValue.(int64); ok {
- return int64Field.Int64Value < int64Val
- }
- if intVal, ok := compareValue.(int); ok {
- return int64Field.Int64Value < int64(intVal)
- }
- }
- // Handle direct int32 comparisons
- if int32Field, ok := fieldValue.Kind.(*schema_pb.Value_Int32Value); ok {
- if int32Val, ok := compareValue.(int32); ok {
- return int32Field.Int32Value < int32Val
- }
- if intVal, ok := compareValue.(int); ok {
- return int32Field.Int32Value < int32(intVal)
- }
- if int64Val, ok := compareValue.(int64); ok && int64Val >= math.MinInt32 && int64Val <= math.MaxInt32 {
- return int32Field.Int32Value < int32(int64Val)
- }
- }
- // Handle numeric comparisons with type coercion (fallback for other numeric types)
- fieldNum := e.convertToNumber(fieldValue)
- compareNum := e.convertCompareValueToNumber(compareValue)
- if fieldNum != nil && compareNum != nil {
- return *fieldNum < *compareNum
- }
- return false
- }
- func (e *SQLEngine) valueGreaterThan(fieldValue *schema_pb.Value, compareValue interface{}) bool {
- // Handle string comparisons lexicographically
- if strField, ok := fieldValue.Kind.(*schema_pb.Value_StringValue); ok {
- if strVal, ok := compareValue.(string); ok {
- return strField.StringValue > strVal
- }
- return false
- }
- // Handle logical type comparisons
- if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok {
- if timestampVal, ok := compareValue.(int64); ok {
- return timestampField.TimestampValue.TimestampMicros > timestampVal
- }
- return false
- }
- if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok {
- if dateVal, ok := compareValue.(int32); ok {
- return dateField.DateValue.DaysSinceEpoch > dateVal
- }
- return false
- }
- if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok {
- if timeVal, ok := compareValue.(int64); ok {
- return timeField.TimeValue.TimeMicros > timeVal
- }
- return false
- }
- // Handle direct int64 comparisons for timestamp precision (before float64 conversion)
- if int64Field, ok := fieldValue.Kind.(*schema_pb.Value_Int64Value); ok {
- if int64Val, ok := compareValue.(int64); ok {
- return int64Field.Int64Value > int64Val
- }
- if intVal, ok := compareValue.(int); ok {
- return int64Field.Int64Value > int64(intVal)
- }
- }
- // Handle direct int32 comparisons
- if int32Field, ok := fieldValue.Kind.(*schema_pb.Value_Int32Value); ok {
- if int32Val, ok := compareValue.(int32); ok {
- return int32Field.Int32Value > int32Val
- }
- if intVal, ok := compareValue.(int); ok {
- return int32Field.Int32Value > int32(intVal)
- }
- if int64Val, ok := compareValue.(int64); ok && int64Val >= math.MinInt32 && int64Val <= math.MaxInt32 {
- return int32Field.Int32Value > int32(int64Val)
- }
- }
- // Handle numeric comparisons with type coercion (fallback for other numeric types)
- fieldNum := e.convertToNumber(fieldValue)
- compareNum := e.convertCompareValueToNumber(compareValue)
- if fieldNum != nil && compareNum != nil {
- return *fieldNum > *compareNum
- }
- return false
- }
- // valueLike implements SQL LIKE pattern matching with % and _ wildcards
- func (e *SQLEngine) valueLike(fieldValue *schema_pb.Value, compareValue interface{}) bool {
- // Only support LIKE for string values
- stringVal, ok := fieldValue.Kind.(*schema_pb.Value_StringValue)
- if !ok {
- return false
- }
- pattern, ok := compareValue.(string)
- if !ok {
- return false
- }
- // Convert SQL LIKE pattern to Go regex pattern
- // % matches any sequence of characters (.*), _ matches single character (.)
- regexPattern := strings.ReplaceAll(pattern, "%", ".*")
- regexPattern = strings.ReplaceAll(regexPattern, "_", ".")
- regexPattern = "^" + regexPattern + "$" // Anchor to match entire string
- // Compile and match regex
- regex, err := regexp.Compile(regexPattern)
- if err != nil {
- return false // Invalid pattern
- }
- return regex.MatchString(stringVal.StringValue)
- }
- // valueIn implements SQL IN operator for checking if value exists in a list
- func (e *SQLEngine) valueIn(fieldValue *schema_pb.Value, compareValue interface{}) bool {
- // For now, handle simple case where compareValue is a slice of values
- // In a full implementation, this would handle SQL IN expressions properly
- values, ok := compareValue.([]interface{})
- if !ok {
- return false
- }
- // Check if fieldValue matches any value in the list
- for _, value := range values {
- if e.valuesEqual(fieldValue, value) {
- return true
- }
- }
- return false
- }
- // Helper methods for specific operations
- func (e *SQLEngine) showDatabases(ctx context.Context) (*QueryResult, error) {
- databases := e.catalog.ListDatabases()
- result := &QueryResult{
- Columns: []string{"Database"},
- Rows: make([][]sqltypes.Value, len(databases)),
- }
- for i, db := range databases {
- result.Rows[i] = []sqltypes.Value{
- sqltypes.NewVarChar(db),
- }
- }
- return result, nil
- }
- func (e *SQLEngine) showTables(ctx context.Context, dbName string) (*QueryResult, error) {
- // Use current database context if no database specified
- if dbName == "" {
- dbName = e.catalog.GetCurrentDatabase()
- if dbName == "" {
- dbName = "default"
- }
- }
- tables, err := e.catalog.ListTables(dbName)
- if err != nil {
- return &QueryResult{Error: err}, err
- }
- result := &QueryResult{
- Columns: []string{"Tables_in_" + dbName},
- Rows: make([][]sqltypes.Value, len(tables)),
- }
- for i, table := range tables {
- result.Rows[i] = []sqltypes.Value{
- sqltypes.NewVarChar(table),
- }
- }
- return result, nil
- }
- // compareLiteralValues compares two literal values with the given operator
- func (e *SQLEngine) compareLiteralValues(left, right interface{}, operator string) bool {
- switch operator {
- case "=", "==":
- return e.literalValuesEqual(left, right)
- case "!=", "<>":
- return !e.literalValuesEqual(left, right)
- case "<":
- return e.compareLiteralNumber(left, right) < 0
- case "<=":
- return e.compareLiteralNumber(left, right) <= 0
- case ">":
- return e.compareLiteralNumber(left, right) > 0
- case ">=":
- return e.compareLiteralNumber(left, right) >= 0
- default:
- // For unsupported operators, default to false
- return false
- }
- }
- // literalValuesEqual checks if two literal values are equal
- func (e *SQLEngine) literalValuesEqual(left, right interface{}) bool {
- // Convert both to strings for comparison
- leftStr := fmt.Sprintf("%v", left)
- rightStr := fmt.Sprintf("%v", right)
- return leftStr == rightStr
- }
- // compareLiteralNumber compares two values as numbers
- func (e *SQLEngine) compareLiteralNumber(left, right interface{}) int {
- leftNum, leftOk := e.convertToFloat64(left)
- rightNum, rightOk := e.convertToFloat64(right)
- if !leftOk || !rightOk {
- // Fall back to string comparison if not numeric
- leftStr := fmt.Sprintf("%v", left)
- rightStr := fmt.Sprintf("%v", right)
- if leftStr < rightStr {
- return -1
- } else if leftStr > rightStr {
- return 1
- } else {
- return 0
- }
- }
- if leftNum < rightNum {
- return -1
- } else if leftNum > rightNum {
- return 1
- } else {
- return 0
- }
- }
- // convertToFloat64 attempts to convert a value to float64
- func (e *SQLEngine) convertToFloat64(value interface{}) (float64, bool) {
- switch v := value.(type) {
- case int64:
- return float64(v), true
- case int32:
- return float64(v), true
- case int:
- return float64(v), true
- case float64:
- return v, true
- case float32:
- return float64(v), true
- case string:
- if num, err := strconv.ParseFloat(v, 64); err == nil {
- return num, true
- }
- return 0, false
- default:
- return 0, false
- }
- }
- func (e *SQLEngine) createTable(ctx context.Context, stmt *DDLStatement) (*QueryResult, error) {
- // Parse CREATE TABLE statement
- // Assumption: Table name format is [database.]table_name
- tableName := stmt.NewName.Name.String()
- database := ""
- // Check if database is specified in table name
- if stmt.NewName.Qualifier.String() != "" {
- database = stmt.NewName.Qualifier.String()
- } else {
- // Use current database context or default
- database = e.catalog.GetCurrentDatabase()
- if database == "" {
- database = "default"
- }
- }
- // Parse column definitions from CREATE TABLE
- // Assumption: stmt.TableSpec contains column definitions
- if stmt.TableSpec == nil || len(stmt.TableSpec.Columns) == 0 {
- err := fmt.Errorf("CREATE TABLE requires column definitions")
- return &QueryResult{Error: err}, err
- }
- // Convert SQL columns to MQ schema fields
- fields := make([]*schema_pb.Field, len(stmt.TableSpec.Columns))
- for i, col := range stmt.TableSpec.Columns {
- fieldType, err := e.convertSQLTypeToMQ(col.Type)
- if err != nil {
- return &QueryResult{Error: err}, err
- }
- fields[i] = &schema_pb.Field{
- Name: col.Name.String(),
- Type: fieldType,
- }
- }
- // Create record type for the topic
- recordType := &schema_pb.RecordType{
- Fields: fields,
- }
- // Create the topic via broker using configurable partition count
- partitionCount := e.catalog.GetDefaultPartitionCount()
- err := e.catalog.brokerClient.ConfigureTopic(ctx, database, tableName, partitionCount, recordType)
- if err != nil {
- return &QueryResult{Error: err}, err
- }
- // Register the new topic in catalog
- mqSchema := &schema.Schema{
- Namespace: database,
- Name: tableName,
- RecordType: recordType,
- RevisionId: 1, // Initial revision
- }
- err = e.catalog.RegisterTopic(database, tableName, mqSchema)
- if err != nil {
- return &QueryResult{Error: err}, err
- }
- // Return success result
- result := &QueryResult{
- Columns: []string{"Result"},
- Rows: [][]sqltypes.Value{
- {sqltypes.NewVarChar(fmt.Sprintf("Table '%s.%s' created successfully", database, tableName))},
- },
- }
- return result, nil
- }
- // ExecutionPlanBuilder handles building execution plans for queries
- type ExecutionPlanBuilder struct {
- engine *SQLEngine
- }
- // NewExecutionPlanBuilder creates a new execution plan builder
- func NewExecutionPlanBuilder(engine *SQLEngine) *ExecutionPlanBuilder {
- return &ExecutionPlanBuilder{engine: engine}
- }
- // BuildAggregationPlan builds an execution plan for aggregation queries
- func (builder *ExecutionPlanBuilder) BuildAggregationPlan(
- stmt *SelectStatement,
- aggregations []AggregationSpec,
- strategy AggregationStrategy,
- dataSources *TopicDataSources,
- ) *QueryExecutionPlan {
- plan := &QueryExecutionPlan{
- QueryType: "SELECT",
- ExecutionStrategy: builder.determineExecutionStrategy(stmt, strategy),
- DataSources: builder.buildDataSourcesList(strategy, dataSources),
- PartitionsScanned: dataSources.PartitionsCount,
- ParquetFilesScanned: builder.countParquetFiles(dataSources),
- LiveLogFilesScanned: builder.countLiveLogFiles(dataSources),
- OptimizationsUsed: builder.buildOptimizationsList(stmt, strategy, dataSources),
- Aggregations: builder.buildAggregationsList(aggregations),
- Details: make(map[string]interface{}),
- }
- // Set row counts based on strategy
- if strategy.CanUseFastPath {
- // Only live logs and broker buffer rows are actually scanned; parquet uses metadata
- plan.TotalRowsProcessed = dataSources.LiveLogRowCount
- if dataSources.BrokerUnflushedCount > 0 {
- plan.TotalRowsProcessed += dataSources.BrokerUnflushedCount
- }
- // Set scan method based on what data sources actually exist
- if dataSources.ParquetRowCount > 0 && (dataSources.LiveLogRowCount > 0 || dataSources.BrokerUnflushedCount > 0) {
- plan.Details["scan_method"] = "Parquet Metadata + Live Log/Broker Counting"
- } else if dataSources.ParquetRowCount > 0 {
- plan.Details["scan_method"] = "Parquet Metadata Only"
- } else {
- plan.Details["scan_method"] = "Live Log/Broker Counting Only"
- }
- } else {
- plan.TotalRowsProcessed = dataSources.ParquetRowCount + dataSources.LiveLogRowCount
- plan.Details["scan_method"] = "Full Data Scan"
- }
- return plan
- }
- // determineExecutionStrategy determines the execution strategy based on query characteristics
- func (builder *ExecutionPlanBuilder) determineExecutionStrategy(stmt *SelectStatement, strategy AggregationStrategy) string {
- if stmt.Where != nil {
- return "full_scan"
- }
- if strategy.CanUseFastPath {
- return "hybrid_fast_path"
- }
- return "full_scan"
- }
- // buildDataSourcesList builds the list of data sources used
- func (builder *ExecutionPlanBuilder) buildDataSourcesList(strategy AggregationStrategy, dataSources *TopicDataSources) []string {
- sources := []string{}
- if strategy.CanUseFastPath {
- // Only show parquet stats if there are actual parquet files
- if dataSources.ParquetRowCount > 0 {
- sources = append(sources, "parquet_stats")
- }
- if dataSources.LiveLogRowCount > 0 {
- sources = append(sources, "live_logs")
- }
- if dataSources.BrokerUnflushedCount > 0 {
- sources = append(sources, "broker_buffer")
- }
- } else {
- sources = append(sources, "live_logs", "parquet_files")
- }
- // Note: broker_buffer is added dynamically during execution when broker is queried
- // See aggregations.go lines 397-409 for the broker buffer data source addition logic
- return sources
- }
- // countParquetFiles counts the total number of parquet files across all partitions
- func (builder *ExecutionPlanBuilder) countParquetFiles(dataSources *TopicDataSources) int {
- count := 0
- for _, fileStats := range dataSources.ParquetFiles {
- count += len(fileStats)
- }
- return count
- }
- // countLiveLogFiles returns the total number of live log files across all partitions
- func (builder *ExecutionPlanBuilder) countLiveLogFiles(dataSources *TopicDataSources) int {
- return dataSources.LiveLogFilesCount
- }
- // buildOptimizationsList builds the list of optimizations used
- func (builder *ExecutionPlanBuilder) buildOptimizationsList(stmt *SelectStatement, strategy AggregationStrategy, dataSources *TopicDataSources) []string {
- optimizations := []string{}
- if strategy.CanUseFastPath {
- // Only include parquet statistics if there are actual parquet files
- if dataSources.ParquetRowCount > 0 {
- optimizations = append(optimizations, "parquet_statistics")
- }
- if dataSources.LiveLogRowCount > 0 {
- optimizations = append(optimizations, "live_log_counting")
- }
- // Always include deduplication when using fast path
- optimizations = append(optimizations, "deduplication")
- }
- if stmt.Where != nil {
- // Check if "predicate_pushdown" is already in the list
- found := false
- for _, opt := range optimizations {
- if opt == "predicate_pushdown" {
- found = true
- break
- }
- }
- if !found {
- optimizations = append(optimizations, "predicate_pushdown")
- }
- }
- return optimizations
- }
- // buildAggregationsList builds the list of aggregations for display
- func (builder *ExecutionPlanBuilder) buildAggregationsList(aggregations []AggregationSpec) []string {
- aggList := make([]string, len(aggregations))
- for i, spec := range aggregations {
- aggList[i] = fmt.Sprintf("%s(%s)", spec.Function, spec.Column)
- }
- return aggList
- }
- // parseAggregationFunction parses an aggregation function expression
- func (e *SQLEngine) parseAggregationFunction(funcExpr *FuncExpr, aliasExpr *AliasedExpr) (*AggregationSpec, error) {
- funcName := strings.ToUpper(funcExpr.Name.String())
- spec := &AggregationSpec{
- Function: funcName,
- }
- // Parse function arguments
- switch funcName {
- case FuncCOUNT:
- if len(funcExpr.Exprs) != 1 {
- return nil, fmt.Errorf("COUNT function expects exactly 1 argument")
- }
- switch arg := funcExpr.Exprs[0].(type) {
- case *StarExpr:
- spec.Column = "*"
- spec.Alias = "COUNT(*)"
- case *AliasedExpr:
- if colName, ok := arg.Expr.(*ColName); ok {
- spec.Column = colName.Name.String()
- spec.Alias = fmt.Sprintf("COUNT(%s)", spec.Column)
- } else {
- return nil, fmt.Errorf("COUNT argument must be a column name or *")
- }
- default:
- return nil, fmt.Errorf("unsupported COUNT argument: %T", arg)
- }
- case FuncSUM, FuncAVG, FuncMIN, FuncMAX:
- if len(funcExpr.Exprs) != 1 {
- return nil, fmt.Errorf("%s function expects exactly 1 argument", funcName)
- }
- switch arg := funcExpr.Exprs[0].(type) {
- case *AliasedExpr:
- if colName, ok := arg.Expr.(*ColName); ok {
- spec.Column = colName.Name.String()
- spec.Alias = fmt.Sprintf("%s(%s)", funcName, spec.Column)
- } else {
- return nil, fmt.Errorf("%s argument must be a column name", funcName)
- }
- default:
- return nil, fmt.Errorf("unsupported %s argument: %T", funcName, arg)
- }
- default:
- return nil, fmt.Errorf("unsupported aggregation function: %s", funcName)
- }
- // Override with user-specified alias if provided
- if aliasExpr != nil && aliasExpr.As != nil && !aliasExpr.As.IsEmpty() {
- spec.Alias = aliasExpr.As.String()
- }
- return spec, nil
- }
- // computeLiveLogMinMax scans live log files to find MIN/MAX values for a specific column
- func (e *SQLEngine) computeLiveLogMinMax(partitionPath string, columnName string, parquetSourceFiles map[string]bool) (interface{}, interface{}, error) {
- if e.catalog.brokerClient == nil {
- return nil, nil, fmt.Errorf("no broker client available")
- }
- filerClient, err := e.catalog.brokerClient.GetFilerClient()
- if err != nil {
- return nil, nil, fmt.Errorf("failed to get filer client: %v", err)
- }
- var minValue, maxValue interface{}
- var minSchemaValue, maxSchemaValue *schema_pb.Value
- // Process each live log file
- err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
- // Skip parquet files and directories
- if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") {
- return nil
- }
- // Skip files that have been converted to parquet (deduplication)
- if parquetSourceFiles[entry.Name] {
- return nil
- }
- filePath := partitionPath + "/" + entry.Name
- // Scan this log file for MIN/MAX values
- fileMin, fileMax, err := e.computeFileMinMax(filerClient, filePath, columnName)
- if err != nil {
- fmt.Printf("Warning: failed to compute min/max for file %s: %v\n", filePath, err)
- return nil // Continue with other files
- }
- // Update global min/max
- if fileMin != nil {
- if minSchemaValue == nil || e.compareValues(fileMin, minSchemaValue) < 0 {
- minSchemaValue = fileMin
- minValue = e.extractRawValue(fileMin)
- }
- }
- if fileMax != nil {
- if maxSchemaValue == nil || e.compareValues(fileMax, maxSchemaValue) > 0 {
- maxSchemaValue = fileMax
- maxValue = e.extractRawValue(fileMax)
- }
- }
- return nil
- })
- if err != nil {
- return nil, nil, fmt.Errorf("failed to process partition directory %s: %v", partitionPath, err)
- }
- return minValue, maxValue, nil
- }
- // computeFileMinMax scans a single log file to find MIN/MAX values for a specific column
- func (e *SQLEngine) computeFileMinMax(filerClient filer_pb.FilerClient, filePath string, columnName string) (*schema_pb.Value, *schema_pb.Value, error) {
- var minValue, maxValue *schema_pb.Value
- err := e.eachLogEntryInFile(filerClient, filePath, func(logEntry *filer_pb.LogEntry) error {
- // Convert log entry to record value
- recordValue, _, err := e.convertLogEntryToRecordValue(logEntry)
- if err != nil {
- return err // This will stop processing this file but not fail the overall query
- }
- // Extract the requested column value
- var columnValue *schema_pb.Value
- if e.isSystemColumn(columnName) {
- // Handle system columns
- switch strings.ToLower(columnName) {
- case SW_COLUMN_NAME_TIMESTAMP:
- columnValue = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}}
- case SW_COLUMN_NAME_KEY:
- columnValue = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}}
- case SW_COLUMN_NAME_SOURCE:
- columnValue = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "live_log"}}
- }
- } else {
- // Handle regular data columns
- if value, exists := recordValue.Fields[columnName]; exists {
- columnValue = value
- }
- }
- if columnValue == nil {
- return nil // Skip this record
- }
- // Update min/max
- if minValue == nil || e.compareValues(columnValue, minValue) < 0 {
- minValue = columnValue
- }
- if maxValue == nil || e.compareValues(columnValue, maxValue) > 0 {
- maxValue = columnValue
- }
- return nil
- })
- return minValue, maxValue, err
- }
- // eachLogEntryInFile reads a log file and calls the provided function for each log entry
- func (e *SQLEngine) eachLogEntryInFile(filerClient filer_pb.FilerClient, filePath string, fn func(*filer_pb.LogEntry) error) error {
- // Extract directory and filename
- // filePath is like "partitionPath/filename"
- lastSlash := strings.LastIndex(filePath, "/")
- if lastSlash == -1 {
- return fmt.Errorf("invalid file path: %s", filePath)
- }
- dirPath := filePath[:lastSlash]
- fileName := filePath[lastSlash+1:]
- // Get file entry
- var fileEntry *filer_pb.Entry
- err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(dirPath), "", func(entry *filer_pb.Entry, isLast bool) error {
- if entry.Name == fileName {
- fileEntry = entry
- }
- return nil
- })
- if err != nil {
- return fmt.Errorf("failed to find file %s: %v", filePath, err)
- }
- if fileEntry == nil {
- return fmt.Errorf("file not found: %s", filePath)
- }
- lookupFileIdFn := filer.LookupFn(filerClient)
- // eachChunkFn processes each chunk's data (pattern from countRowsInLogFile)
- eachChunkFn := func(buf []byte) error {
- for pos := 0; pos+4 < len(buf); {
- size := util.BytesToUint32(buf[pos : pos+4])
- if pos+4+int(size) > len(buf) {
- break
- }
- entryData := buf[pos+4 : pos+4+int(size)]
- logEntry := &filer_pb.LogEntry{}
- if err := proto.Unmarshal(entryData, logEntry); err != nil {
- pos += 4 + int(size)
- continue // Skip corrupted entries
- }
- // Call the provided function for each log entry
- if err := fn(logEntry); err != nil {
- return err
- }
- pos += 4 + int(size)
- }
- return nil
- }
- // Read file chunks and process them (pattern from countRowsInLogFile)
- fileSize := filer.FileSize(fileEntry)
- visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, fileEntry.Chunks, 0, int64(fileSize))
- chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
- for x := chunkViews.Front(); x != nil; x = x.Next {
- chunk := x.Value
- urlStrings, err := lookupFileIdFn(context.Background(), chunk.FileId)
- if err != nil {
- fmt.Printf("Warning: failed to lookup chunk %s: %v\n", chunk.FileId, err)
- continue
- }
- if len(urlStrings) == 0 {
- continue
- }
- // Read chunk data
- // urlStrings[0] is already a complete URL (http://server:port/fileId)
- data, _, err := util_http.Get(urlStrings[0])
- if err != nil {
- fmt.Printf("Warning: failed to read chunk %s from %s: %v\n", chunk.FileId, urlStrings[0], err)
- continue
- }
- // Process this chunk
- if err := eachChunkFn(data); err != nil {
- return err
- }
- }
- return nil
- }
- // convertLogEntryToRecordValue helper method (reuse existing logic)
- func (e *SQLEngine) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
- // Parse the log entry data as Protocol Buffer (not JSON!)
- recordValue := &schema_pb.RecordValue{}
- if err := proto.Unmarshal(logEntry.Data, recordValue); err != nil {
- return nil, "", fmt.Errorf("failed to unmarshal log entry protobuf: %v", err)
- }
- // Ensure Fields map exists
- if recordValue.Fields == nil {
- recordValue.Fields = make(map[string]*schema_pb.Value)
- }
- // Add system columns
- recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
- Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
- }
- recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
- Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
- }
- // User data fields are already present in the protobuf-deserialized recordValue
- // No additional processing needed since proto.Unmarshal already populated the Fields map
- return recordValue, "live_log", nil
- }
- // extractTimestampFromFilename extracts timestamp from parquet filename
- // Format: YYYY-MM-DD-HH-MM-SS.parquet
- func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 {
- // Remove .parquet extension
- filename = strings.TrimSuffix(filename, ".parquet")
- // Parse timestamp format: 2006-01-02-15-04-05
- t, err := time.Parse("2006-01-02-15-04-05", filename)
- if err != nil {
- return 0
- }
- return t.UnixNano()
- }
- // extractParquetSourceFiles extracts source log file names from parquet file metadata for deduplication
- func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map[string]bool {
- sourceFiles := make(map[string]bool)
- for _, fileStat := range fileStats {
- // Each ParquetFileStats should have a reference to the original file entry
- // but we need to get it through the hybrid scanner to access Extended metadata
- // This is a simplified approach - in practice we'd need to access the filer entry
- // For now, we'll use filename-based deduplication as a fallback
- // Extract timestamp from parquet filename (YYYY-MM-DD-HH-MM-SS.parquet)
- if strings.HasSuffix(fileStat.FileName, ".parquet") {
- timeStr := strings.TrimSuffix(fileStat.FileName, ".parquet")
- // Mark this timestamp range as covered by parquet
- sourceFiles[timeStr] = true
- }
- }
- return sourceFiles
- }
- // countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet and duplicate log buffer data
- func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partitionPath string, parquetSourceFiles map[string]bool) (int64, error) {
- debugEnabled := ctx != nil && isDebugMode(ctx)
- filerClient, err := e.catalog.brokerClient.GetFilerClient()
- if err != nil {
- return 0, err
- }
- // First, get the actual source files from parquet metadata
- actualSourceFiles, err := e.getParquetSourceFilesFromMetadata(partitionPath)
- if err != nil {
- // If we can't read parquet metadata, use filename-based fallback
- fmt.Printf("Warning: failed to read parquet metadata, using filename-based deduplication: %v\n", err)
- actualSourceFiles = parquetSourceFiles
- }
- // Second, get duplicate files from log buffer metadata
- logBufferDuplicates, err := e.buildLogBufferDeduplicationMap(ctx, partitionPath)
- if err != nil {
- if debugEnabled {
- fmt.Printf("Warning: failed to build log buffer deduplication map: %v\n", err)
- }
- logBufferDuplicates = make(map[string]bool)
- }
- // Debug: Show deduplication status (only in explain mode)
- if debugEnabled {
- if len(actualSourceFiles) > 0 {
- fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath)
- }
- if len(logBufferDuplicates) > 0 {
- fmt.Printf("Excluding %d duplicate log buffer files from %s\n", len(logBufferDuplicates), partitionPath)
- }
- }
- totalRows := int64(0)
- err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
- if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") {
- return nil // Skip directories and parquet files
- }
- // Skip files that have been converted to parquet
- if actualSourceFiles[entry.Name] {
- if debugEnabled {
- fmt.Printf("Skipping %s (already converted to parquet)\n", entry.Name)
- }
- return nil
- }
- // Skip files that are duplicated due to log buffer metadata
- if logBufferDuplicates[entry.Name] {
- if debugEnabled {
- fmt.Printf("Skipping %s (duplicate log buffer data)\n", entry.Name)
- }
- return nil
- }
- // Count rows in live log file
- rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry)
- if err != nil {
- fmt.Printf("Warning: failed to count rows in %s/%s: %v\n", partitionPath, entry.Name, err)
- return nil // Continue with other files
- }
- totalRows += rowCount
- return nil
- })
- return totalRows, err
- }
- // getParquetSourceFilesFromMetadata reads parquet file metadata to get actual source log files
- func (e *SQLEngine) getParquetSourceFilesFromMetadata(partitionPath string) (map[string]bool, error) {
- filerClient, err := e.catalog.brokerClient.GetFilerClient()
- if err != nil {
- return nil, err
- }
- sourceFiles := make(map[string]bool)
- err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
- if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") {
- return nil
- }
- // Read source files from Extended metadata
- if entry.Extended != nil && entry.Extended["sources"] != nil {
- var sources []string
- if err := json.Unmarshal(entry.Extended["sources"], &sources); err == nil {
- for _, source := range sources {
- sourceFiles[source] = true
- }
- }
- }
- return nil
- })
- return sourceFiles, err
- }
- // getLogBufferStartFromFile reads buffer start from file extended attributes
- func (e *SQLEngine) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBufferStart, error) {
- if entry.Extended == nil {
- return nil, nil
- }
- // Only support binary buffer_start format
- if startData, exists := entry.Extended["buffer_start"]; exists {
- if len(startData) == 8 {
- startIndex := int64(binary.BigEndian.Uint64(startData))
- if startIndex > 0 {
- return &LogBufferStart{StartIndex: startIndex}, nil
- }
- } else {
- return nil, fmt.Errorf("invalid buffer_start format: expected 8 bytes, got %d", len(startData))
- }
- }
- return nil, nil
- }
- // buildLogBufferDeduplicationMap creates a map to track duplicate files based on buffer ranges (ultra-efficient)
- func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitionPath string) (map[string]bool, error) {
- debugEnabled := ctx != nil && isDebugMode(ctx)
- if e.catalog.brokerClient == nil {
- return make(map[string]bool), nil
- }
- filerClient, err := e.catalog.brokerClient.GetFilerClient()
- if err != nil {
- return make(map[string]bool), nil // Don't fail the query, just skip deduplication
- }
- // Track buffer ranges instead of individual indexes (much more efficient)
- type BufferRange struct {
- start, end int64
- }
- processedRanges := make([]BufferRange, 0)
- duplicateFiles := make(map[string]bool)
- err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
- if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") {
- return nil // Skip directories and parquet files
- }
- // Get buffer start for this file (most efficient)
- bufferStart, err := e.getLogBufferStartFromFile(entry)
- if err != nil || bufferStart == nil {
- return nil // No buffer info, can't deduplicate
- }
- // Calculate range for this file: [start, start + chunkCount - 1]
- chunkCount := int64(len(entry.GetChunks()))
- if chunkCount == 0 {
- return nil // Empty file, skip
- }
- fileRange := BufferRange{
- start: bufferStart.StartIndex,
- end: bufferStart.StartIndex + chunkCount - 1,
- }
- // Check if this range overlaps with any processed range
- isDuplicate := false
- for _, processedRange := range processedRanges {
- if fileRange.start <= processedRange.end && fileRange.end >= processedRange.start {
- // Ranges overlap - this file contains duplicate buffer indexes
- isDuplicate = true
- if debugEnabled {
- fmt.Printf("Marking %s as duplicate (buffer range [%d-%d] overlaps with [%d-%d])\n",
- entry.Name, fileRange.start, fileRange.end, processedRange.start, processedRange.end)
- }
- break
- }
- }
- if isDuplicate {
- duplicateFiles[entry.Name] = true
- } else {
- // Add this range to processed ranges
- processedRanges = append(processedRanges, fileRange)
- }
- return nil
- })
- if err != nil {
- return make(map[string]bool), nil // Don't fail the query
- }
- return duplicateFiles, nil
- }
- // countRowsInLogFile counts rows in a single log file using SeaweedFS patterns
- func (e *SQLEngine) countRowsInLogFile(filerClient filer_pb.FilerClient, partitionPath string, entry *filer_pb.Entry) (int64, error) {
- lookupFileIdFn := filer.LookupFn(filerClient)
- rowCount := int64(0)
- // eachChunkFn processes each chunk's data (pattern from read_log_from_disk.go)
- eachChunkFn := func(buf []byte) error {
- for pos := 0; pos+4 < len(buf); {
- size := util.BytesToUint32(buf[pos : pos+4])
- if pos+4+int(size) > len(buf) {
- break
- }
- entryData := buf[pos+4 : pos+4+int(size)]
- logEntry := &filer_pb.LogEntry{}
- if err := proto.Unmarshal(entryData, logEntry); err != nil {
- pos += 4 + int(size)
- continue // Skip corrupted entries
- }
- // Skip control messages (publisher control, empty key, or no data)
- if isControlLogEntry(logEntry) {
- pos += 4 + int(size)
- continue
- }
- rowCount++
- pos += 4 + int(size)
- }
- return nil
- }
- // Read file chunks and process them (pattern from read_log_from_disk.go)
- fileSize := filer.FileSize(entry)
- visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
- chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
- for x := chunkViews.Front(); x != nil; x = x.Next {
- chunk := x.Value
- urlStrings, err := lookupFileIdFn(context.Background(), chunk.FileId)
- if err != nil {
- fmt.Printf("Warning: failed to lookup chunk %s: %v\n", chunk.FileId, err)
- continue
- }
- if len(urlStrings) == 0 {
- continue
- }
- // Read chunk data
- // urlStrings[0] is already a complete URL (http://server:port/fileId)
- data, _, err := util_http.Get(urlStrings[0])
- if err != nil {
- fmt.Printf("Warning: failed to read chunk %s from %s: %v\n", chunk.FileId, urlStrings[0], err)
- continue
- }
- // Process this chunk
- if err := eachChunkFn(data); err != nil {
- return rowCount, err
- }
- }
- return rowCount, nil
- }
- // isControlLogEntry checks if a log entry is a control entry without actual user data
- // Control entries include:
- // - DataMessages with populated Ctrl field (publisher control signals)
- // - Entries with empty keys (filtered by subscriber)
- // - Entries with no data
- func isControlLogEntry(logEntry *filer_pb.LogEntry) bool {
- // No data: control or placeholder
- if len(logEntry.Data) == 0 {
- return true
- }
- // Empty keys are treated as control entries (consistent with subscriber filtering)
- if len(logEntry.Key) == 0 {
- return true
- }
- // Check if the payload is a DataMessage carrying a control signal
- dataMessage := &mq_pb.DataMessage{}
- if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil {
- if dataMessage.Ctrl != nil {
- return true
- }
- }
- return false
- }
- // discoverTopicPartitions discovers all partitions for a given topic using centralized logic
- func (e *SQLEngine) discoverTopicPartitions(namespace, topicName string) ([]string, error) {
- // Use centralized topic partition discovery
- t := topic.NewTopic(namespace, topicName)
- // Get FilerClient from BrokerClient
- filerClient, err := e.catalog.brokerClient.GetFilerClient()
- if err != nil {
- return nil, err
- }
- return t.DiscoverPartitions(context.Background(), filerClient)
- }
- // getTopicTotalRowCount returns the total number of rows in a topic (combining parquet and live logs)
- func (e *SQLEngine) getTopicTotalRowCount(ctx context.Context, namespace, topicName string) (int64, error) {
- // Create a hybrid scanner to access parquet statistics
- var filerClient filer_pb.FilerClient
- if e.catalog.brokerClient != nil {
- var filerClientErr error
- filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient()
- if filerClientErr != nil {
- return 0, filerClientErr
- }
- }
- hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, namespace, topicName, e)
- if err != nil {
- return 0, err
- }
- // Get all partitions for this topic
- // Note: discoverTopicPartitions always returns absolute paths
- partitions, err := e.discoverTopicPartitions(namespace, topicName)
- if err != nil {
- return 0, err
- }
- totalRowCount := int64(0)
- // For each partition, count both parquet and live log rows
- for _, partition := range partitions {
- // Count parquet rows
- parquetStats, parquetErr := hybridScanner.ReadParquetStatistics(partition)
- if parquetErr == nil {
- for _, stats := range parquetStats {
- totalRowCount += stats.RowCount
- }
- }
- // Count live log rows (with deduplication)
- parquetSourceFiles := make(map[string]bool)
- if parquetErr == nil {
- parquetSourceFiles = e.extractParquetSourceFiles(parquetStats)
- }
- liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(ctx, partition, parquetSourceFiles)
- if liveLogErr == nil {
- totalRowCount += liveLogCount
- }
- }
- return totalRowCount, nil
- }
- // getActualRowsScannedForFastPath returns only the rows that need to be scanned for fast path aggregations
- // (i.e., live log rows that haven't been converted to parquet - parquet uses metadata only)
- func (e *SQLEngine) getActualRowsScannedForFastPath(ctx context.Context, namespace, topicName string) (int64, error) {
- // Create a hybrid scanner to access parquet statistics
- var filerClient filer_pb.FilerClient
- if e.catalog.brokerClient != nil {
- var filerClientErr error
- filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient()
- if filerClientErr != nil {
- return 0, filerClientErr
- }
- }
- hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, namespace, topicName, e)
- if err != nil {
- return 0, err
- }
- // Get all partitions for this topic
- // Note: discoverTopicPartitions always returns absolute paths
- partitions, err := e.discoverTopicPartitions(namespace, topicName)
- if err != nil {
- return 0, err
- }
- totalScannedRows := int64(0)
- // For each partition, count ONLY the live log rows that need scanning
- // (parquet files use metadata/statistics, so they contribute 0 to scan count)
- for _, partition := range partitions {
- // Get parquet files to determine what was converted
- parquetStats, parquetErr := hybridScanner.ReadParquetStatistics(partition)
- parquetSourceFiles := make(map[string]bool)
- if parquetErr == nil {
- parquetSourceFiles = e.extractParquetSourceFiles(parquetStats)
- }
- // Count only live log rows that haven't been converted to parquet
- liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(ctx, partition, parquetSourceFiles)
- if liveLogErr == nil {
- totalScannedRows += liveLogCount
- }
- // Note: Parquet files contribute 0 to scan count since we use their metadata/statistics
- }
- return totalScannedRows, nil
- }
- // findColumnValue performs case-insensitive lookup of column values
- // Now includes support for system columns stored in HybridScanResult
- func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) *schema_pb.Value {
- // Check system columns first (stored separately in HybridScanResult)
- lowerColumnName := strings.ToLower(columnName)
- switch lowerColumnName {
- case SW_COLUMN_NAME_TIMESTAMP, SW_DISPLAY_NAME_TIMESTAMP:
- // For timestamp column, format as proper timestamp instead of raw nanoseconds
- timestamp := time.Unix(result.Timestamp/1e9, result.Timestamp%1e9)
- timestampStr := timestamp.UTC().Format("2006-01-02T15:04:05.000000000Z")
- return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: timestampStr}}
- case SW_COLUMN_NAME_KEY:
- return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}}
- case SW_COLUMN_NAME_SOURCE:
- return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: result.Source}}
- }
- // Then check regular columns in Values map
- // First try exact match
- if value, exists := result.Values[columnName]; exists {
- return value
- }
- // Then try case-insensitive match
- for key, value := range result.Values {
- if strings.ToLower(key) == lowerColumnName {
- return value
- }
- }
- return nil
- }
- // discoverAndRegisterTopic attempts to discover an existing topic and register it in the SQL catalog
- func (e *SQLEngine) discoverAndRegisterTopic(ctx context.Context, database, tableName string) error {
- // First, check if topic exists by trying to get its schema from the broker/filer
- recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName)
- if err != nil {
- return fmt.Errorf("topic %s.%s not found or no schema available: %v", database, tableName, err)
- }
- // Create a schema object from the discovered record type
- mqSchema := &schema.Schema{
- Namespace: database,
- Name: tableName,
- RecordType: recordType,
- RevisionId: 1, // Default to revision 1 for discovered topics
- }
- // Register the topic in the SQL catalog
- err = e.catalog.RegisterTopic(database, tableName, mqSchema)
- if err != nil {
- return fmt.Errorf("failed to register discovered topic %s.%s: %v", database, tableName, err)
- }
- // Note: This is a discovery operation, not query execution, so it's okay to always log
- return nil
- }
- // getArithmeticExpressionAlias generates a display alias for arithmetic expressions
- func (e *SQLEngine) getArithmeticExpressionAlias(expr *ArithmeticExpr) string {
- leftAlias := e.getExpressionAlias(expr.Left)
- rightAlias := e.getExpressionAlias(expr.Right)
- return leftAlias + expr.Operator + rightAlias
- }
- // getExpressionAlias generates an alias for any expression node
- func (e *SQLEngine) getExpressionAlias(expr ExprNode) string {
- switch exprType := expr.(type) {
- case *ColName:
- return exprType.Name.String()
- case *ArithmeticExpr:
- return e.getArithmeticExpressionAlias(exprType)
- case *SQLVal:
- return e.getSQLValAlias(exprType)
- default:
- return "expr"
- }
- }
- // evaluateArithmeticExpression evaluates an arithmetic expression for a given record
- func (e *SQLEngine) evaluateArithmeticExpression(expr *ArithmeticExpr, result HybridScanResult) (*schema_pb.Value, error) {
- // Check for timestamp arithmetic with intervals first
- if e.isTimestampArithmetic(expr.Left, expr.Right) && (expr.Operator == "+" || expr.Operator == "-") {
- return e.evaluateTimestampArithmetic(expr.Left, expr.Right, expr.Operator)
- }
- // Get left operand value
- leftValue, err := e.evaluateExpressionValue(expr.Left, result)
- if err != nil {
- return nil, fmt.Errorf("error evaluating left operand: %v", err)
- }
- // Get right operand value
- rightValue, err := e.evaluateExpressionValue(expr.Right, result)
- if err != nil {
- return nil, fmt.Errorf("error evaluating right operand: %v", err)
- }
- // Handle string concatenation operator
- if expr.Operator == "||" {
- return e.Concat(leftValue, rightValue)
- }
- // Perform arithmetic operation
- var op ArithmeticOperator
- switch expr.Operator {
- case "+":
- op = OpAdd
- case "-":
- op = OpSub
- case "*":
- op = OpMul
- case "/":
- op = OpDiv
- case "%":
- op = OpMod
- default:
- return nil, fmt.Errorf("unsupported arithmetic operator: %s", expr.Operator)
- }
- return e.EvaluateArithmeticExpression(leftValue, rightValue, op)
- }
- // isTimestampArithmetic checks if an arithmetic operation involves timestamps and intervals
- func (e *SQLEngine) isTimestampArithmetic(left, right ExprNode) bool {
- // Check if left is a timestamp function (NOW, CURRENT_TIMESTAMP, etc.)
- leftIsTimestamp := e.isTimestampFunction(left)
- // Check if right is an interval
- rightIsInterval := e.isIntervalExpression(right)
- return leftIsTimestamp && rightIsInterval
- }
- // isTimestampFunction checks if an expression is a timestamp function
- func (e *SQLEngine) isTimestampFunction(expr ExprNode) bool {
- if funcExpr, ok := expr.(*FuncExpr); ok {
- funcName := strings.ToUpper(funcExpr.Name.String())
- return funcName == "NOW" || funcName == "CURRENT_TIMESTAMP" || funcName == "CURRENT_DATE" || funcName == "CURRENT_TIME"
- }
- return false
- }
- // isIntervalExpression checks if an expression is an interval
- func (e *SQLEngine) isIntervalExpression(expr ExprNode) bool {
- _, ok := expr.(*IntervalExpr)
- return ok
- }
- // evaluateExpressionValue evaluates any expression to get its value from a record
- func (e *SQLEngine) evaluateExpressionValue(expr ExprNode, result HybridScanResult) (*schema_pb.Value, error) {
- switch exprType := expr.(type) {
- case *ColName:
- columnName := exprType.Name.String()
- upperColumnName := strings.ToUpper(columnName)
- // Check if this is actually a string literal that was parsed as ColName
- if (strings.HasPrefix(columnName, "'") && strings.HasSuffix(columnName, "'")) ||
- (strings.HasPrefix(columnName, "\"") && strings.HasSuffix(columnName, "\"")) {
- // This is a string literal that was incorrectly parsed as a column name
- literal := strings.Trim(strings.Trim(columnName, "'"), "\"")
- return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: literal}}, nil
- }
- // Check if this is actually a function call that was parsed as ColName
- if strings.Contains(columnName, "(") && strings.Contains(columnName, ")") {
- // This is a function call that was parsed incorrectly as a column name
- // We need to manually evaluate it as a function
- return e.evaluateColumnNameAsFunction(columnName, result)
- }
- // Check if this is a datetime constant
- if upperColumnName == FuncCURRENT_DATE || upperColumnName == FuncCURRENT_TIME ||
- upperColumnName == FuncCURRENT_TIMESTAMP || upperColumnName == FuncNOW {
- switch upperColumnName {
- case FuncCURRENT_DATE:
- return e.CurrentDate()
- case FuncCURRENT_TIME:
- return e.CurrentTime()
- case FuncCURRENT_TIMESTAMP:
- return e.CurrentTimestamp()
- case FuncNOW:
- return e.Now()
- }
- }
- // Check if this is actually a numeric literal disguised as a column name
- if val, err := strconv.ParseInt(columnName, 10, 64); err == nil {
- return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: val}}, nil
- }
- if val, err := strconv.ParseFloat(columnName, 64); err == nil {
- return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: val}}, nil
- }
- // Otherwise, treat as a regular column lookup
- value := e.findColumnValue(result, columnName)
- if value == nil {
- return nil, nil
- }
- return value, nil
- case *ArithmeticExpr:
- return e.evaluateArithmeticExpression(exprType, result)
- case *SQLVal:
- // Handle literal values
- return e.convertSQLValToSchemaValue(exprType), nil
- case *FuncExpr:
- // Handle function calls that are part of arithmetic expressions
- funcName := strings.ToUpper(exprType.Name.String())
- // Route to appropriate function evaluator based on function type
- if e.isDateTimeFunction(funcName) {
- // Use datetime function evaluator
- return e.evaluateDateTimeFunction(exprType, result)
- } else {
- // Use string function evaluator
- return e.evaluateStringFunction(exprType, result)
- }
- case *IntervalExpr:
- // Handle interval expressions - evaluate as duration in nanoseconds
- nanos, err := e.evaluateInterval(exprType.Value)
- if err != nil {
- return nil, err
- }
- return &schema_pb.Value{
- Kind: &schema_pb.Value_Int64Value{Int64Value: nanos},
- }, nil
- default:
- return nil, fmt.Errorf("unsupported expression type: %T", expr)
- }
- }
- // convertSQLValToSchemaValue converts SQLVal literal to schema_pb.Value
- func (e *SQLEngine) convertSQLValToSchemaValue(sqlVal *SQLVal) *schema_pb.Value {
- switch sqlVal.Type {
- case IntVal:
- if val, err := strconv.ParseInt(string(sqlVal.Val), 10, 64); err == nil {
- return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: val}}
- }
- case FloatVal:
- if val, err := strconv.ParseFloat(string(sqlVal.Val), 64); err == nil {
- return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: val}}
- }
- case StrVal:
- return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(sqlVal.Val)}}
- }
- // Default to string if parsing fails
- return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(sqlVal.Val)}}
- }
- // ConvertToSQLResultWithExpressions converts HybridScanResults to SQL query results with expression evaluation
- func (e *SQLEngine) ConvertToSQLResultWithExpressions(hms *HybridMessageScanner, results []HybridScanResult, selectExprs []SelectExpr) *QueryResult {
- if len(results) == 0 {
- columns := make([]string, 0, len(selectExprs))
- for _, selectExpr := range selectExprs {
- switch expr := selectExpr.(type) {
- case *AliasedExpr:
- // Check if alias is available and use it
- if expr.As != nil && !expr.As.IsEmpty() {
- columns = append(columns, expr.As.String())
- } else {
- // Fall back to expression-based column naming
- switch col := expr.Expr.(type) {
- case *ColName:
- columnName := col.Name.String()
- upperColumnName := strings.ToUpper(columnName)
- // Check if this is an arithmetic expression embedded in a ColName
- if arithmeticExpr := e.parseColumnLevelCalculation(columnName); arithmeticExpr != nil {
- columns = append(columns, e.getArithmeticExpressionAlias(arithmeticExpr))
- } else if upperColumnName == FuncCURRENT_DATE || upperColumnName == FuncCURRENT_TIME ||
- upperColumnName == FuncCURRENT_TIMESTAMP || upperColumnName == FuncNOW {
- // Use lowercase for datetime constants in column headers
- columns = append(columns, strings.ToLower(columnName))
- } else {
- // Use display name for system columns
- displayName := e.getSystemColumnDisplayName(columnName)
- columns = append(columns, displayName)
- }
- case *ArithmeticExpr:
- columns = append(columns, e.getArithmeticExpressionAlias(col))
- case *FuncExpr:
- columns = append(columns, e.getStringFunctionAlias(col))
- case *SQLVal:
- columns = append(columns, e.getSQLValAlias(col))
- default:
- columns = append(columns, "expr")
- }
- }
- }
- }
- return &QueryResult{
- Columns: columns,
- Rows: [][]sqltypes.Value{},
- Database: hms.topic.Namespace,
- Table: hms.topic.Name,
- }
- }
- // Build columns from SELECT expressions
- columns := make([]string, 0, len(selectExprs))
- for _, selectExpr := range selectExprs {
- switch expr := selectExpr.(type) {
- case *AliasedExpr:
- // Check if alias is available and use it
- if expr.As != nil && !expr.As.IsEmpty() {
- columns = append(columns, expr.As.String())
- } else {
- // Fall back to expression-based column naming
- switch col := expr.Expr.(type) {
- case *ColName:
- columnName := col.Name.String()
- upperColumnName := strings.ToUpper(columnName)
- // Check if this is an arithmetic expression embedded in a ColName
- if arithmeticExpr := e.parseColumnLevelCalculation(columnName); arithmeticExpr != nil {
- columns = append(columns, e.getArithmeticExpressionAlias(arithmeticExpr))
- } else if upperColumnName == FuncCURRENT_DATE || upperColumnName == FuncCURRENT_TIME ||
- upperColumnName == FuncCURRENT_TIMESTAMP || upperColumnName == FuncNOW {
- // Use lowercase for datetime constants in column headers
- columns = append(columns, strings.ToLower(columnName))
- } else {
- columns = append(columns, columnName)
- }
- case *ArithmeticExpr:
- columns = append(columns, e.getArithmeticExpressionAlias(col))
- case *FuncExpr:
- columns = append(columns, e.getStringFunctionAlias(col))
- case *SQLVal:
- columns = append(columns, e.getSQLValAlias(col))
- default:
- columns = append(columns, "expr")
- }
- }
- }
- }
- // Convert to SQL rows with expression evaluation
- rows := make([][]sqltypes.Value, len(results))
- for i, result := range results {
- row := make([]sqltypes.Value, len(selectExprs))
- for j, selectExpr := range selectExprs {
- switch expr := selectExpr.(type) {
- case *AliasedExpr:
- switch col := expr.Expr.(type) {
- case *ColName:
- // Handle regular column, datetime constants, or arithmetic expressions
- columnName := col.Name.String()
- upperColumnName := strings.ToUpper(columnName)
- // Check if this is an arithmetic expression embedded in a ColName
- if arithmeticExpr := e.parseColumnLevelCalculation(columnName); arithmeticExpr != nil {
- // Handle as arithmetic expression
- if value, err := e.evaluateArithmeticExpression(arithmeticExpr, result); err == nil && value != nil {
- row[j] = convertSchemaValueToSQL(value)
- } else {
- row[j] = sqltypes.NULL
- }
- } else if upperColumnName == "CURRENT_DATE" || upperColumnName == "CURRENT_TIME" ||
- upperColumnName == "CURRENT_TIMESTAMP" || upperColumnName == "NOW" {
- // Handle as datetime function
- var value *schema_pb.Value
- var err error
- switch upperColumnName {
- case FuncCURRENT_DATE:
- value, err = e.CurrentDate()
- case FuncCURRENT_TIME:
- value, err = e.CurrentTime()
- case FuncCURRENT_TIMESTAMP:
- value, err = e.CurrentTimestamp()
- case FuncNOW:
- value, err = e.Now()
- }
- if err == nil && value != nil {
- row[j] = convertSchemaValueToSQL(value)
- } else {
- row[j] = sqltypes.NULL
- }
- } else {
- // Handle as regular column
- if value := e.findColumnValue(result, columnName); value != nil {
- row[j] = convertSchemaValueToSQL(value)
- } else {
- row[j] = sqltypes.NULL
- }
- }
- case *ArithmeticExpr:
- // Handle arithmetic expression
- if value, err := e.evaluateArithmeticExpression(col, result); err == nil && value != nil {
- row[j] = convertSchemaValueToSQL(value)
- } else {
- row[j] = sqltypes.NULL
- }
- case *FuncExpr:
- // Handle function - route to appropriate evaluator
- funcName := strings.ToUpper(col.Name.String())
- var value *schema_pb.Value
- var err error
- // Check if it's a datetime function
- if e.isDateTimeFunction(funcName) {
- value, err = e.evaluateDateTimeFunction(col, result)
- } else {
- // Default to string function evaluator
- value, err = e.evaluateStringFunction(col, result)
- }
- if err == nil && value != nil {
- row[j] = convertSchemaValueToSQL(value)
- } else {
- row[j] = sqltypes.NULL
- }
- case *SQLVal:
- // Handle literal value
- value := e.convertSQLValToSchemaValue(col)
- row[j] = convertSchemaValueToSQL(value)
- default:
- row[j] = sqltypes.NULL
- }
- default:
- row[j] = sqltypes.NULL
- }
- }
- rows[i] = row
- }
- return &QueryResult{
- Columns: columns,
- Rows: rows,
- Database: hms.topic.Namespace,
- Table: hms.topic.Name,
- }
- }
- // extractBaseColumns recursively extracts base column names from arithmetic expressions
- func (e *SQLEngine) extractBaseColumns(expr *ArithmeticExpr, baseColumnsSet map[string]bool) {
- // Extract columns from left operand
- e.extractBaseColumnsFromExpression(expr.Left, baseColumnsSet)
- // Extract columns from right operand
- e.extractBaseColumnsFromExpression(expr.Right, baseColumnsSet)
- }
- // extractBaseColumnsFromExpression extracts base column names from any expression node
- func (e *SQLEngine) extractBaseColumnsFromExpression(expr ExprNode, baseColumnsSet map[string]bool) {
- switch exprType := expr.(type) {
- case *ColName:
- columnName := exprType.Name.String()
- // Check if it's a literal number disguised as a column name
- if _, err := strconv.ParseInt(columnName, 10, 64); err != nil {
- if _, err := strconv.ParseFloat(columnName, 64); err != nil {
- // Not a numeric literal, treat as actual column name
- baseColumnsSet[columnName] = true
- }
- }
- case *ArithmeticExpr:
- // Recursively handle nested arithmetic expressions
- e.extractBaseColumns(exprType, baseColumnsSet)
- }
- }
- // isAggregationFunction checks if a function name is an aggregation function
- func (e *SQLEngine) isAggregationFunction(funcName string) bool {
- // Convert to uppercase for case-insensitive comparison
- upperFuncName := strings.ToUpper(funcName)
- switch upperFuncName {
- case FuncCOUNT, FuncSUM, FuncAVG, FuncMIN, FuncMAX:
- return true
- default:
- return false
- }
- }
- // isStringFunction checks if a function name is a string function
- func (e *SQLEngine) isStringFunction(funcName string) bool {
- switch funcName {
- case FuncUPPER, FuncLOWER, FuncLENGTH, FuncTRIM, FuncBTRIM, FuncLTRIM, FuncRTRIM, FuncSUBSTRING, FuncLEFT, FuncRIGHT, FuncCONCAT:
- return true
- default:
- return false
- }
- }
- // isDateTimeFunction checks if a function name is a datetime function
- func (e *SQLEngine) isDateTimeFunction(funcName string) bool {
- switch funcName {
- case FuncCURRENT_DATE, FuncCURRENT_TIME, FuncCURRENT_TIMESTAMP, FuncNOW, FuncEXTRACT, FuncDATE_TRUNC:
- return true
- default:
- return false
- }
- }
- // getStringFunctionAlias generates an alias for string functions
- func (e *SQLEngine) getStringFunctionAlias(funcExpr *FuncExpr) string {
- funcName := funcExpr.Name.String()
- if len(funcExpr.Exprs) == 1 {
- if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok {
- if colName, ok := aliasedExpr.Expr.(*ColName); ok {
- return fmt.Sprintf("%s(%s)", funcName, colName.Name.String())
- }
- }
- }
- return fmt.Sprintf("%s(...)", funcName)
- }
- // getDateTimeFunctionAlias generates an alias for datetime functions
- func (e *SQLEngine) getDateTimeFunctionAlias(funcExpr *FuncExpr) string {
- funcName := funcExpr.Name.String()
- // Handle zero-argument functions like CURRENT_DATE, NOW
- if len(funcExpr.Exprs) == 0 {
- // Use lowercase for datetime constants in column headers
- return strings.ToLower(funcName)
- }
- // Handle EXTRACT function specially to create unique aliases
- if strings.ToUpper(funcName) == "EXTRACT" && len(funcExpr.Exprs) == 2 {
- // Try to extract the date part to make the alias unique
- if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok {
- if sqlVal, ok := aliasedExpr.Expr.(*SQLVal); ok && sqlVal.Type == StrVal {
- datePart := strings.ToLower(string(sqlVal.Val))
- return fmt.Sprintf("extract_%s", datePart)
- }
- }
- // Fallback to generic if we can't extract the date part
- return fmt.Sprintf("%s(...)", funcName)
- }
- // Handle other multi-argument functions like DATE_TRUNC
- if len(funcExpr.Exprs) == 2 {
- return fmt.Sprintf("%s(...)", funcName)
- }
- return fmt.Sprintf("%s(...)", funcName)
- }
- // extractBaseColumnsFromFunction extracts base columns needed by a string function
- func (e *SQLEngine) extractBaseColumnsFromFunction(funcExpr *FuncExpr, baseColumnsSet map[string]bool) {
- for _, expr := range funcExpr.Exprs {
- if aliasedExpr, ok := expr.(*AliasedExpr); ok {
- e.extractBaseColumnsFromExpression(aliasedExpr.Expr, baseColumnsSet)
- }
- }
- }
- // getSQLValAlias generates an alias for SQL literal values
- func (e *SQLEngine) getSQLValAlias(sqlVal *SQLVal) string {
- switch sqlVal.Type {
- case StrVal:
- // Escape single quotes by replacing ' with '' (SQL standard escaping)
- escapedVal := strings.ReplaceAll(string(sqlVal.Val), "'", "''")
- return fmt.Sprintf("'%s'", escapedVal)
- case IntVal:
- return string(sqlVal.Val)
- case FloatVal:
- return string(sqlVal.Val)
- default:
- return "literal"
- }
- }
- // evaluateStringFunction evaluates a string function for a given record
- func (e *SQLEngine) evaluateStringFunction(funcExpr *FuncExpr, result HybridScanResult) (*schema_pb.Value, error) {
- funcName := strings.ToUpper(funcExpr.Name.String())
- // Most string functions require exactly 1 argument
- if len(funcExpr.Exprs) != 1 {
- return nil, fmt.Errorf("function %s expects exactly 1 argument", funcName)
- }
- // Get the argument value
- var argValue *schema_pb.Value
- if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok {
- var err error
- argValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result)
- if err != nil {
- return nil, fmt.Errorf("error evaluating function argument: %v", err)
- }
- } else {
- return nil, fmt.Errorf("unsupported function argument type")
- }
- if argValue == nil {
- return nil, nil // NULL input produces NULL output
- }
- // Call the appropriate string function
- switch funcName {
- case FuncUPPER:
- return e.Upper(argValue)
- case FuncLOWER:
- return e.Lower(argValue)
- case FuncLENGTH:
- return e.Length(argValue)
- case FuncTRIM, FuncBTRIM: // CockroachDB converts TRIM to BTRIM
- return e.Trim(argValue)
- case FuncLTRIM:
- return e.LTrim(argValue)
- case FuncRTRIM:
- return e.RTrim(argValue)
- default:
- return nil, fmt.Errorf("unsupported string function: %s", funcName)
- }
- }
- // evaluateDateTimeFunction evaluates a datetime function for a given record
- func (e *SQLEngine) evaluateDateTimeFunction(funcExpr *FuncExpr, result HybridScanResult) (*schema_pb.Value, error) {
- funcName := strings.ToUpper(funcExpr.Name.String())
- switch funcName {
- case FuncEXTRACT:
- // EXTRACT requires exactly 2 arguments: date part and value
- if len(funcExpr.Exprs) != 2 {
- return nil, fmt.Errorf("EXTRACT function expects exactly 2 arguments (date_part, value), got %d", len(funcExpr.Exprs))
- }
- // Get the first argument (date part)
- var datePartValue *schema_pb.Value
- if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok {
- var err error
- datePartValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result)
- if err != nil {
- return nil, fmt.Errorf("error evaluating EXTRACT date part argument: %v", err)
- }
- } else {
- return nil, fmt.Errorf("unsupported EXTRACT date part argument type")
- }
- if datePartValue == nil {
- return nil, fmt.Errorf("EXTRACT date part cannot be NULL")
- }
- // Convert date part to string
- var datePart string
- if stringVal, ok := datePartValue.Kind.(*schema_pb.Value_StringValue); ok {
- datePart = strings.ToUpper(stringVal.StringValue)
- } else {
- return nil, fmt.Errorf("EXTRACT date part must be a string")
- }
- // Get the second argument (value to extract from)
- var extractValue *schema_pb.Value
- if aliasedExpr, ok := funcExpr.Exprs[1].(*AliasedExpr); ok {
- var err error
- extractValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result)
- if err != nil {
- return nil, fmt.Errorf("error evaluating EXTRACT value argument: %v", err)
- }
- } else {
- return nil, fmt.Errorf("unsupported EXTRACT value argument type")
- }
- if extractValue == nil {
- return nil, nil // NULL input produces NULL output
- }
- // Call the Extract function
- return e.Extract(DatePart(datePart), extractValue)
- case FuncDATE_TRUNC:
- // DATE_TRUNC requires exactly 2 arguments: precision and value
- if len(funcExpr.Exprs) != 2 {
- return nil, fmt.Errorf("DATE_TRUNC function expects exactly 2 arguments (precision, value), got %d", len(funcExpr.Exprs))
- }
- // Get the first argument (precision)
- var precisionValue *schema_pb.Value
- if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok {
- var err error
- precisionValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result)
- if err != nil {
- return nil, fmt.Errorf("error evaluating DATE_TRUNC precision argument: %v", err)
- }
- } else {
- return nil, fmt.Errorf("unsupported DATE_TRUNC precision argument type")
- }
- if precisionValue == nil {
- return nil, fmt.Errorf("DATE_TRUNC precision cannot be NULL")
- }
- // Convert precision to string
- var precision string
- if stringVal, ok := precisionValue.Kind.(*schema_pb.Value_StringValue); ok {
- precision = stringVal.StringValue
- } else {
- return nil, fmt.Errorf("DATE_TRUNC precision must be a string")
- }
- // Get the second argument (value to truncate)
- var truncateValue *schema_pb.Value
- if aliasedExpr, ok := funcExpr.Exprs[1].(*AliasedExpr); ok {
- var err error
- truncateValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result)
- if err != nil {
- return nil, fmt.Errorf("error evaluating DATE_TRUNC value argument: %v", err)
- }
- } else {
- return nil, fmt.Errorf("unsupported DATE_TRUNC value argument type")
- }
- if truncateValue == nil {
- return nil, nil // NULL input produces NULL output
- }
- // Call the DateTrunc function
- return e.DateTrunc(precision, truncateValue)
- case FuncCURRENT_DATE:
- // CURRENT_DATE is a zero-argument function
- if len(funcExpr.Exprs) != 0 {
- return nil, fmt.Errorf("CURRENT_DATE function expects no arguments, got %d", len(funcExpr.Exprs))
- }
- return e.CurrentDate()
- case FuncCURRENT_TIME:
- // CURRENT_TIME is a zero-argument function
- if len(funcExpr.Exprs) != 0 {
- return nil, fmt.Errorf("CURRENT_TIME function expects no arguments, got %d", len(funcExpr.Exprs))
- }
- return e.CurrentTime()
- case FuncCURRENT_TIMESTAMP:
- // CURRENT_TIMESTAMP is a zero-argument function
- if len(funcExpr.Exprs) != 0 {
- return nil, fmt.Errorf("CURRENT_TIMESTAMP function expects no arguments, got %d", len(funcExpr.Exprs))
- }
- return e.CurrentTimestamp()
- case FuncNOW:
- // NOW is a zero-argument function (but often used with () syntax)
- if len(funcExpr.Exprs) != 0 {
- return nil, fmt.Errorf("NOW function expects no arguments, got %d", len(funcExpr.Exprs))
- }
- return e.Now()
- // PostgreSQL uses EXTRACT(part FROM date) instead of convenience functions like YEAR(date)
- default:
- return nil, fmt.Errorf("unsupported datetime function: %s", funcName)
- }
- }
- // evaluateInterval parses an interval string and returns duration in nanoseconds
- func (e *SQLEngine) evaluateInterval(intervalValue string) (int64, error) {
- // Parse interval strings like "1 hour", "30 minutes", "2 days"
- parts := strings.Fields(strings.TrimSpace(intervalValue))
- if len(parts) != 2 {
- return 0, fmt.Errorf("invalid interval format: %s (expected 'number unit')", intervalValue)
- }
- // Parse the numeric value
- value, err := strconv.ParseInt(parts[0], 10, 64)
- if err != nil {
- return 0, fmt.Errorf("invalid interval value: %s", parts[0])
- }
- // Parse the unit and convert to nanoseconds
- unit := strings.ToLower(parts[1])
- var multiplier int64
- switch unit {
- case "nanosecond", "nanoseconds", "ns":
- multiplier = 1
- case "microsecond", "microseconds", "us":
- multiplier = 1000
- case "millisecond", "milliseconds", "ms":
- multiplier = 1000000
- case "second", "seconds", "s":
- multiplier = 1000000000
- case "minute", "minutes", "m":
- multiplier = 60 * 1000000000
- case "hour", "hours", "h":
- multiplier = 60 * 60 * 1000000000
- case "day", "days", "d":
- multiplier = 24 * 60 * 60 * 1000000000
- case "week", "weeks", "w":
- multiplier = 7 * 24 * 60 * 60 * 1000000000
- default:
- return 0, fmt.Errorf("unsupported interval unit: %s", unit)
- }
- return value * multiplier, nil
- }
- // convertValueForTimestampColumn converts string timestamp values to nanoseconds for system timestamp columns
- func (e *SQLEngine) convertValueForTimestampColumn(columnName string, value interface{}, expr ExprNode) interface{} {
- // Special handling for timestamp system columns
- if columnName == SW_COLUMN_NAME_TIMESTAMP {
- if _, ok := value.(string); ok {
- if timeNanos := e.extractTimeValue(expr); timeNanos != 0 {
- return timeNanos
- }
- }
- }
- return value
- }
- // evaluateTimestampArithmetic performs arithmetic operations with timestamps and intervals
- func (e *SQLEngine) evaluateTimestampArithmetic(left, right ExprNode, operator string) (*schema_pb.Value, error) {
- // Handle timestamp arithmetic: NOW() - INTERVAL '1 hour'
- // For timestamp arithmetic, we don't need the result context, so we pass an empty one
- emptyResult := HybridScanResult{}
- leftValue, err := e.evaluateExpressionValue(left, emptyResult)
- if err != nil {
- return nil, fmt.Errorf("failed to evaluate left operand: %v", err)
- }
- rightValue, err := e.evaluateExpressionValue(right, emptyResult)
- if err != nil {
- return nil, fmt.Errorf("failed to evaluate right operand: %v", err)
- }
- // Convert left operand (should be timestamp)
- var leftTimestamp int64
- if leftValue.Kind != nil {
- switch leftKind := leftValue.Kind.(type) {
- case *schema_pb.Value_Int64Value:
- leftTimestamp = leftKind.Int64Value
- case *schema_pb.Value_TimestampValue:
- // Convert microseconds to nanoseconds
- leftTimestamp = leftKind.TimestampValue.TimestampMicros * 1000
- case *schema_pb.Value_StringValue:
- // Parse timestamp string
- if ts, err := time.Parse(time.RFC3339, leftKind.StringValue); err == nil {
- leftTimestamp = ts.UnixNano()
- } else if ts, err := time.Parse("2006-01-02 15:04:05", leftKind.StringValue); err == nil {
- leftTimestamp = ts.UnixNano()
- } else {
- return nil, fmt.Errorf("invalid timestamp format: %s", leftKind.StringValue)
- }
- default:
- return nil, fmt.Errorf("left operand must be a timestamp, got: %T", leftKind)
- }
- } else {
- return nil, fmt.Errorf("left operand value is nil")
- }
- // Convert right operand (should be interval in nanoseconds)
- var intervalNanos int64
- if rightValue.Kind != nil {
- switch rightKind := rightValue.Kind.(type) {
- case *schema_pb.Value_Int64Value:
- intervalNanos = rightKind.Int64Value
- default:
- return nil, fmt.Errorf("right operand must be an interval duration")
- }
- } else {
- return nil, fmt.Errorf("right operand value is nil")
- }
- // Perform arithmetic
- var resultTimestamp int64
- switch operator {
- case "+":
- resultTimestamp = leftTimestamp + intervalNanos
- case "-":
- resultTimestamp = leftTimestamp - intervalNanos
- default:
- return nil, fmt.Errorf("unsupported timestamp arithmetic operator: %s", operator)
- }
- // Return as timestamp
- return &schema_pb.Value{
- Kind: &schema_pb.Value_Int64Value{Int64Value: resultTimestamp},
- }, nil
- }
- // evaluateColumnNameAsFunction handles function calls that were incorrectly parsed as column names
- func (e *SQLEngine) evaluateColumnNameAsFunction(columnName string, result HybridScanResult) (*schema_pb.Value, error) {
- // Simple parser for basic function calls like TRIM('hello world')
- // Extract function name and argument
- parenPos := strings.Index(columnName, "(")
- if parenPos == -1 {
- return nil, fmt.Errorf("invalid function format: %s", columnName)
- }
- funcName := strings.ToUpper(strings.TrimSpace(columnName[:parenPos]))
- argsString := columnName[parenPos+1:]
- // Find the closing parenthesis (handling nested quotes)
- closeParen := strings.LastIndex(argsString, ")")
- if closeParen == -1 {
- return nil, fmt.Errorf("missing closing parenthesis in function: %s", columnName)
- }
- argString := strings.TrimSpace(argsString[:closeParen])
- // Parse the argument - for now handle simple cases
- var argValue *schema_pb.Value
- var err error
- if strings.HasPrefix(argString, "'") && strings.HasSuffix(argString, "'") {
- // String literal argument
- literal := strings.Trim(argString, "'")
- argValue = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: literal}}
- } else if strings.Contains(argString, "(") && strings.Contains(argString, ")") {
- // Nested function call - recursively evaluate it
- argValue, err = e.evaluateColumnNameAsFunction(argString, result)
- if err != nil {
- return nil, fmt.Errorf("error evaluating nested function argument: %v", err)
- }
- } else {
- // Column name or other expression
- return nil, fmt.Errorf("unsupported argument type in function: %s", argString)
- }
- if argValue == nil {
- return nil, nil
- }
- // Call the appropriate function
- switch funcName {
- case FuncUPPER:
- return e.Upper(argValue)
- case FuncLOWER:
- return e.Lower(argValue)
- case FuncLENGTH:
- return e.Length(argValue)
- case FuncTRIM, FuncBTRIM: // CockroachDB converts TRIM to BTRIM
- return e.Trim(argValue)
- case FuncLTRIM:
- return e.LTrim(argValue)
- case FuncRTRIM:
- return e.RTrim(argValue)
- // PostgreSQL-only: Use EXTRACT(YEAR FROM date) instead of YEAR(date)
- default:
- return nil, fmt.Errorf("unsupported function in column name: %s", funcName)
- }
- }
- // parseColumnLevelCalculation detects and parses arithmetic expressions that contain function calls
- // This handles cases where the SQL parser incorrectly treats "LENGTH('hello') + 10" as a single ColName
- func (e *SQLEngine) parseColumnLevelCalculation(expression string) *ArithmeticExpr {
- // First check if this looks like an arithmetic expression
- if !e.containsArithmeticOperator(expression) {
- return nil
- }
- // Build AST for the arithmetic expression
- return e.buildArithmeticAST(expression)
- }
- // containsArithmeticOperator checks if the expression contains arithmetic operators outside of function calls
- func (e *SQLEngine) containsArithmeticOperator(expr string) bool {
- operators := []string{"+", "-", "*", "/", "%", "||"}
- parenLevel := 0
- quoteLevel := false
- for i, char := range expr {
- switch char {
- case '(':
- if !quoteLevel {
- parenLevel++
- }
- case ')':
- if !quoteLevel {
- parenLevel--
- }
- case '\'':
- quoteLevel = !quoteLevel
- default:
- // Only check for operators outside of parentheses and quotes
- if parenLevel == 0 && !quoteLevel {
- for _, op := range operators {
- if strings.HasPrefix(expr[i:], op) {
- return true
- }
- }
- }
- }
- }
- return false
- }
- // buildArithmeticAST builds an Abstract Syntax Tree for arithmetic expressions containing function calls
- func (e *SQLEngine) buildArithmeticAST(expr string) *ArithmeticExpr {
- // Remove leading/trailing spaces
- expr = strings.TrimSpace(expr)
- // Find the main operator (outside of parentheses)
- operators := []string{"||", "+", "-", "*", "/", "%"} // Order matters for precedence
- for _, op := range operators {
- opPos := e.findMainOperator(expr, op)
- if opPos != -1 {
- leftExpr := strings.TrimSpace(expr[:opPos])
- rightExpr := strings.TrimSpace(expr[opPos+len(op):])
- if leftExpr != "" && rightExpr != "" {
- return &ArithmeticExpr{
- Left: e.parseASTExpressionNode(leftExpr),
- Right: e.parseASTExpressionNode(rightExpr),
- Operator: op,
- }
- }
- }
- }
- return nil
- }
- // findMainOperator finds the position of an operator that's not inside parentheses or quotes
- func (e *SQLEngine) findMainOperator(expr string, operator string) int {
- parenLevel := 0
- quoteLevel := false
- for i := 0; i <= len(expr)-len(operator); i++ {
- char := expr[i]
- switch char {
- case '(':
- if !quoteLevel {
- parenLevel++
- }
- case ')':
- if !quoteLevel {
- parenLevel--
- }
- case '\'':
- quoteLevel = !quoteLevel
- default:
- // Check for operator only at top level (not inside parentheses or quotes)
- if parenLevel == 0 && !quoteLevel && strings.HasPrefix(expr[i:], operator) {
- return i
- }
- }
- }
- return -1
- }
- // parseASTExpressionNode parses an expression into the appropriate ExprNode type
- func (e *SQLEngine) parseASTExpressionNode(expr string) ExprNode {
- expr = strings.TrimSpace(expr)
- // Check if it's a function call (contains parentheses)
- if strings.Contains(expr, "(") && strings.Contains(expr, ")") {
- // This should be parsed as a function expression, but since our SQL parser
- // has limitations, we'll create a special ColName that represents the function
- return &ColName{Name: stringValue(expr)}
- }
- // Check if it's a numeric literal
- if _, err := strconv.ParseInt(expr, 10, 64); err == nil {
- return &SQLVal{Type: IntVal, Val: []byte(expr)}
- }
- if _, err := strconv.ParseFloat(expr, 64); err == nil {
- return &SQLVal{Type: FloatVal, Val: []byte(expr)}
- }
- // Check if it's a string literal
- if strings.HasPrefix(expr, "'") && strings.HasSuffix(expr, "'") {
- return &SQLVal{Type: StrVal, Val: []byte(strings.Trim(expr, "'"))}
- }
- // Check for nested arithmetic expressions
- if nestedArithmetic := e.buildArithmeticAST(expr); nestedArithmetic != nil {
- return nestedArithmetic
- }
- // Default to column name
- return &ColName{Name: stringValue(expr)}
- }
|