log_to_parquet.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. package logstore
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strings"
  10. "time"
  11. "github.com/parquet-go/parquet-go"
  12. "github.com/parquet-go/parquet-go/compress/zstd"
  13. "github.com/seaweedfs/seaweedfs/weed/filer"
  14. "github.com/seaweedfs/seaweedfs/weed/mq/schema"
  15. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  16. "github.com/seaweedfs/seaweedfs/weed/operation"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  19. "github.com/seaweedfs/seaweedfs/weed/util"
  20. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  21. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  22. "google.golang.org/protobuf/proto"
  23. )
  24. const (
  25. SW_COLUMN_NAME_TS = "_ts_ns"
  26. SW_COLUMN_NAME_KEY = "_key"
  27. )
  28. func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error {
  29. // list the topic partition versions
  30. topicVersions, err := collectTopicVersions(filerClient, t, timeAgo)
  31. if err != nil {
  32. return fmt.Errorf("list topic files: %w", err)
  33. }
  34. // compact the partitions
  35. for _, topicVersion := range topicVersions {
  36. partitions, err := collectTopicVersionsPartitions(filerClient, t, topicVersion)
  37. if err != nil {
  38. return fmt.Errorf("list partitions %s/%s/%s: %v", t.Namespace, t.Name, topicVersion, err)
  39. }
  40. for _, partition := range partitions {
  41. err := compactTopicPartition(filerClient, t, timeAgo, recordType, partition, preference)
  42. if err != nil {
  43. return fmt.Errorf("compact partition %s/%s/%s/%s: %v", t.Namespace, t.Name, topicVersion, partition, err)
  44. }
  45. }
  46. }
  47. return nil
  48. }
  49. func collectTopicVersions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration) (partitionVersions []time.Time, err error) {
  50. err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(t.Dir()), "", func(entry *filer_pb.Entry, isLast bool) error {
  51. t, err := topic.ParseTopicVersion(entry.Name)
  52. if err != nil {
  53. // skip non-partition directories
  54. return nil
  55. }
  56. if t.Unix() < time.Now().Unix()-int64(timeAgo/time.Second) {
  57. partitionVersions = append(partitionVersions, t)
  58. }
  59. return nil
  60. })
  61. return
  62. }
  63. func collectTopicVersionsPartitions(filerClient filer_pb.FilerClient, t topic.Topic, topicVersion time.Time) (partitions []topic.Partition, err error) {
  64. version := topicVersion.Format(topic.PartitionGenerationFormat)
  65. err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(t.Dir()).Child(version), "", func(entry *filer_pb.Entry, isLast bool) error {
  66. if !entry.IsDirectory {
  67. return nil
  68. }
  69. start, stop := topic.ParsePartitionBoundary(entry.Name)
  70. if start != stop {
  71. partitions = append(partitions, topic.Partition{
  72. RangeStart: start,
  73. RangeStop: stop,
  74. RingSize: topic.PartitionCount,
  75. UnixTimeNs: topicVersion.UnixNano(),
  76. })
  77. }
  78. return nil
  79. })
  80. return
  81. }
  82. func compactTopicPartition(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, partition topic.Partition, preference *operation.StoragePreference) error {
  83. partitionDir := topic.PartitionDir(t, partition)
  84. // compact the partition directory
  85. return compactTopicPartitionDir(filerClient, t.Name, partitionDir, timeAgo, recordType, preference)
  86. }
  87. func compactTopicPartitionDir(filerClient filer_pb.FilerClient, topicName, partitionDir string, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error {
  88. // read all existing parquet files
  89. minTsNs, maxTsNs, err := readAllParquetFiles(filerClient, partitionDir)
  90. if err != nil {
  91. return err
  92. }
  93. // read all log files
  94. logFiles, err := readAllLogFiles(filerClient, partitionDir, timeAgo, minTsNs, maxTsNs)
  95. if err != nil {
  96. return err
  97. }
  98. if len(logFiles) == 0 {
  99. return nil
  100. }
  101. // divide log files into groups of 128MB
  102. logFileGroups := groupFilesBySize(logFiles, 128*1024*1024)
  103. // write to parquet file
  104. parquetLevels, err := schema.ToParquetLevels(recordType)
  105. if err != nil {
  106. return fmt.Errorf("ToParquetLevels failed %+v: %v", recordType, err)
  107. }
  108. // create a parquet schema
  109. parquetSchema, err := schema.ToParquetSchema(topicName, recordType)
  110. if err != nil {
  111. return fmt.Errorf("ToParquetSchema failed: %w", err)
  112. }
  113. // TODO parallelize the writing
  114. for _, logFileGroup := range logFileGroups {
  115. if err = writeLogFilesToParquet(filerClient, partitionDir, recordType, logFileGroup, parquetSchema, parquetLevels, preference); err != nil {
  116. return err
  117. }
  118. }
  119. return nil
  120. }
  121. func groupFilesBySize(logFiles []*filer_pb.Entry, maxGroupSize int64) (logFileGroups [][]*filer_pb.Entry) {
  122. var logFileGroup []*filer_pb.Entry
  123. var groupSize int64
  124. for _, logFile := range logFiles {
  125. if groupSize+int64(logFile.Attributes.FileSize) > maxGroupSize {
  126. logFileGroups = append(logFileGroups, logFileGroup)
  127. logFileGroup = nil
  128. groupSize = 0
  129. }
  130. logFileGroup = append(logFileGroup, logFile)
  131. groupSize += int64(logFile.Attributes.FileSize)
  132. }
  133. if len(logFileGroup) > 0 {
  134. logFileGroups = append(logFileGroups, logFileGroup)
  135. }
  136. return
  137. }
  138. func readAllLogFiles(filerClient filer_pb.FilerClient, partitionDir string, timeAgo time.Duration, minTsNs, maxTsNs int64) (logFiles []*filer_pb.Entry, err error) {
  139. err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
  140. if strings.HasSuffix(entry.Name, ".parquet") {
  141. return nil
  142. }
  143. if entry.Attributes.Crtime > time.Now().Unix()-int64(timeAgo/time.Second) {
  144. return nil
  145. }
  146. logTime, err := time.Parse(topic.TIME_FORMAT, entry.Name)
  147. if err != nil {
  148. // glog.Warningf("parse log time %s: %v", entry.Name, err)
  149. return nil
  150. }
  151. if maxTsNs > 0 && logTime.UnixNano() <= maxTsNs {
  152. return nil
  153. }
  154. logFiles = append(logFiles, entry)
  155. return nil
  156. })
  157. return
  158. }
  159. func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) (minTsNs, maxTsNs int64, err error) {
  160. err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
  161. if !strings.HasSuffix(entry.Name, ".parquet") {
  162. return nil
  163. }
  164. if len(entry.Extended) == 0 {
  165. return nil
  166. }
  167. // read min ts
  168. minTsBytes := entry.Extended["min"]
  169. if len(minTsBytes) != 8 {
  170. return nil
  171. }
  172. minTs := int64(binary.BigEndian.Uint64(minTsBytes))
  173. if minTsNs == 0 || minTs < minTsNs {
  174. minTsNs = minTs
  175. }
  176. // read max ts
  177. maxTsBytes := entry.Extended["max"]
  178. if len(maxTsBytes) != 8 {
  179. return nil
  180. }
  181. maxTs := int64(binary.BigEndian.Uint64(maxTsBytes))
  182. if maxTsNs == 0 || maxTs > maxTsNs {
  183. maxTsNs = maxTs
  184. }
  185. return nil
  186. })
  187. return
  188. }
  189. func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir string, recordType *schema_pb.RecordType, logFileGroups []*filer_pb.Entry, parquetSchema *parquet.Schema, parquetLevels *schema.ParquetLevels, preference *operation.StoragePreference) (err error) {
  190. tempFile, err := os.CreateTemp(".", "t*.parquet")
  191. if err != nil {
  192. return fmt.Errorf("create temp file: %w", err)
  193. }
  194. defer func() {
  195. tempFile.Close()
  196. os.Remove(tempFile.Name())
  197. }()
  198. // Enable column statistics for fast aggregation queries
  199. writer := parquet.NewWriter(tempFile, parquetSchema,
  200. parquet.Compression(&zstd.Codec{Level: zstd.DefaultLevel}),
  201. parquet.DataPageStatistics(true), // Enable column statistics
  202. )
  203. rowBuilder := parquet.NewRowBuilder(parquetSchema)
  204. var startTsNs, stopTsNs int64
  205. for _, logFile := range logFileGroups {
  206. var rows []parquet.Row
  207. if err := iterateLogEntries(filerClient, logFile, func(entry *filer_pb.LogEntry) error {
  208. // Skip control entries without actual data (same logic as read operations)
  209. if isControlEntry(entry) {
  210. return nil
  211. }
  212. if startTsNs == 0 {
  213. startTsNs = entry.TsNs
  214. }
  215. stopTsNs = entry.TsNs
  216. // write to parquet file
  217. rowBuilder.Reset()
  218. record := &schema_pb.RecordValue{}
  219. if err := proto.Unmarshal(entry.Data, record); err != nil {
  220. return fmt.Errorf("unmarshal record value: %w", err)
  221. }
  222. // Initialize Fields map if nil (prevents nil map assignment panic)
  223. if record.Fields == nil {
  224. record.Fields = make(map[string]*schema_pb.Value)
  225. }
  226. record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{
  227. Kind: &schema_pb.Value_Int64Value{
  228. Int64Value: entry.TsNs,
  229. },
  230. }
  231. // Handle nil key bytes to prevent growslice panic in parquet-go
  232. keyBytes := entry.Key
  233. if keyBytes == nil {
  234. keyBytes = []byte{} // Use empty slice instead of nil
  235. }
  236. record.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
  237. Kind: &schema_pb.Value_BytesValue{
  238. BytesValue: keyBytes,
  239. },
  240. }
  241. if err := schema.AddRecordValue(rowBuilder, recordType, parquetLevels, record); err != nil {
  242. return fmt.Errorf("add record value: %w", err)
  243. }
  244. // Build row and normalize any nil ByteArray values to empty slices
  245. row := rowBuilder.Row()
  246. for i, value := range row {
  247. if value.Kind() == parquet.ByteArray {
  248. if value.ByteArray() == nil {
  249. row[i] = parquet.ByteArrayValue([]byte{})
  250. }
  251. }
  252. }
  253. rows = append(rows, row)
  254. return nil
  255. }); err != nil {
  256. return fmt.Errorf("iterate log entry %v/%v: %w", partitionDir, logFile.Name, err)
  257. }
  258. // Nil ByteArray handling is done during row creation
  259. // Write all rows in a single call
  260. if _, err := writer.WriteRows(rows); err != nil {
  261. return fmt.Errorf("write rows: %w", err)
  262. }
  263. }
  264. if err := writer.Close(); err != nil {
  265. return fmt.Errorf("close writer: %w", err)
  266. }
  267. // write to parquet file to partitionDir
  268. parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05"))
  269. // Collect source log file names and buffer_start metadata for deduplication
  270. var sourceLogFiles []string
  271. var earliestBufferStart int64
  272. for _, logFile := range logFileGroups {
  273. sourceLogFiles = append(sourceLogFiles, logFile.Name)
  274. // Extract buffer_start from log file metadata
  275. if bufferStart := getBufferStartFromLogFile(logFile); bufferStart > 0 {
  276. if earliestBufferStart == 0 || bufferStart < earliestBufferStart {
  277. earliestBufferStart = bufferStart
  278. }
  279. }
  280. }
  281. if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs, sourceLogFiles, earliestBufferStart); err != nil {
  282. return fmt.Errorf("save parquet file %s: %v", parquetFileName, err)
  283. }
  284. return nil
  285. }
  286. func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64, sourceLogFiles []string, earliestBufferStart int64) error {
  287. uploader, err := operation.NewUploader()
  288. if err != nil {
  289. return fmt.Errorf("new uploader: %w", err)
  290. }
  291. // get file size
  292. fileInfo, err := sourceFile.Stat()
  293. if err != nil {
  294. return fmt.Errorf("stat source file: %w", err)
  295. }
  296. // upload file in chunks
  297. chunkSize := int64(4 * 1024 * 1024)
  298. chunkCount := (fileInfo.Size() + chunkSize - 1) / chunkSize
  299. entry := &filer_pb.Entry{
  300. Name: parquetFileName,
  301. Attributes: &filer_pb.FuseAttributes{
  302. Crtime: time.Now().Unix(),
  303. Mtime: time.Now().Unix(),
  304. FileMode: uint32(os.FileMode(0644)),
  305. FileSize: uint64(fileInfo.Size()),
  306. Mime: "application/vnd.apache.parquet",
  307. },
  308. }
  309. entry.Extended = make(map[string][]byte)
  310. minTsBytes := make([]byte, 8)
  311. binary.BigEndian.PutUint64(minTsBytes, uint64(startTsNs))
  312. entry.Extended["min"] = minTsBytes
  313. maxTsBytes := make([]byte, 8)
  314. binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs))
  315. entry.Extended["max"] = maxTsBytes
  316. // Store source log files for deduplication (JSON-encoded list)
  317. if len(sourceLogFiles) > 0 {
  318. sourceLogFilesJson, _ := json.Marshal(sourceLogFiles)
  319. entry.Extended["sources"] = sourceLogFilesJson
  320. }
  321. // Store earliest buffer_start for precise broker deduplication
  322. if earliestBufferStart > 0 {
  323. bufferStartBytes := make([]byte, 8)
  324. binary.BigEndian.PutUint64(bufferStartBytes, uint64(earliestBufferStart))
  325. entry.Extended["buffer_start"] = bufferStartBytes
  326. }
  327. for i := int64(0); i < chunkCount; i++ {
  328. fileId, uploadResult, err, _ := uploader.UploadWithRetry(
  329. filerClient,
  330. &filer_pb.AssignVolumeRequest{
  331. Count: 1,
  332. Replication: preference.Replication,
  333. Collection: preference.Collection,
  334. TtlSec: 0, // TODO set ttl
  335. DiskType: preference.DiskType,
  336. Path: partitionDir + "/" + parquetFileName,
  337. },
  338. &operation.UploadOption{
  339. Filename: parquetFileName,
  340. Cipher: false,
  341. IsInputCompressed: false,
  342. MimeType: "application/vnd.apache.parquet",
  343. PairMap: nil,
  344. },
  345. func(host, fileId string) string {
  346. return fmt.Sprintf("http://%s/%s", host, fileId)
  347. },
  348. io.NewSectionReader(sourceFile, i*chunkSize, chunkSize),
  349. )
  350. if err != nil {
  351. return fmt.Errorf("upload chunk %d: %v", i, err)
  352. }
  353. if uploadResult.Error != "" {
  354. return fmt.Errorf("upload result: %v", uploadResult.Error)
  355. }
  356. entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(fileId, i*chunkSize, time.Now().UnixNano()))
  357. }
  358. // write the entry to partitionDir
  359. if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  360. return filer_pb.CreateEntry(context.Background(), client, &filer_pb.CreateEntryRequest{
  361. Directory: partitionDir,
  362. Entry: entry,
  363. })
  364. }); err != nil {
  365. return fmt.Errorf("create entry: %w", err)
  366. }
  367. return nil
  368. }
  369. func iterateLogEntries(filerClient filer_pb.FilerClient, logFile *filer_pb.Entry, eachLogEntryFn func(entry *filer_pb.LogEntry) error) error {
  370. lookupFn := filer.LookupFn(filerClient)
  371. _, err := eachFile(logFile, lookupFn, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
  372. if err := eachLogEntryFn(logEntry); err != nil {
  373. return true, err
  374. }
  375. return false, nil
  376. })
  377. return err
  378. }
  379. func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(ctx context.Context, fileId string) (targetUrls []string, err error), eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
  380. if len(entry.Content) > 0 {
  381. // skip .offset files
  382. return
  383. }
  384. var urlStrings []string
  385. for _, chunk := range entry.Chunks {
  386. if chunk.Size == 0 {
  387. continue
  388. }
  389. if chunk.IsChunkManifest {
  390. return
  391. }
  392. urlStrings, err = lookupFileIdFn(context.Background(), chunk.FileId)
  393. if err != nil {
  394. err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
  395. return
  396. }
  397. if len(urlStrings) == 0 {
  398. err = fmt.Errorf("no url found for %s", chunk.FileId)
  399. return
  400. }
  401. // try one of the urlString until util.Get(urlString) succeeds
  402. var processed bool
  403. for _, urlString := range urlStrings {
  404. var data []byte
  405. if data, _, err = util_http.Get(urlString); err == nil {
  406. processed = true
  407. if processedTsNs, err = eachChunk(data, eachLogEntryFn); err != nil {
  408. return
  409. }
  410. break
  411. }
  412. }
  413. if !processed {
  414. err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId)
  415. return
  416. }
  417. }
  418. return
  419. }
  420. func eachChunk(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
  421. for pos := 0; pos+4 < len(buf); {
  422. size := util.BytesToUint32(buf[pos : pos+4])
  423. if pos+4+int(size) > len(buf) {
  424. err = fmt.Errorf("reach each log chunk: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf))
  425. return
  426. }
  427. entryData := buf[pos+4 : pos+4+int(size)]
  428. logEntry := &filer_pb.LogEntry{}
  429. if err = proto.Unmarshal(entryData, logEntry); err != nil {
  430. pos += 4 + int(size)
  431. err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %w", err)
  432. return
  433. }
  434. if _, err = eachLogEntryFn(logEntry); err != nil {
  435. err = fmt.Errorf("process log entry %v: %w", logEntry, err)
  436. return
  437. }
  438. processedTsNs = logEntry.TsNs
  439. pos += 4 + int(size)
  440. }
  441. return
  442. }
  443. // getBufferStartFromLogFile extracts the buffer_start index from log file extended metadata
  444. func getBufferStartFromLogFile(logFile *filer_pb.Entry) int64 {
  445. if logFile.Extended == nil {
  446. return 0
  447. }
  448. // Parse buffer_start binary format
  449. if startData, exists := logFile.Extended["buffer_start"]; exists {
  450. if len(startData) == 8 {
  451. startIndex := int64(binary.BigEndian.Uint64(startData))
  452. if startIndex > 0 {
  453. return startIndex
  454. }
  455. }
  456. }
  457. return 0
  458. }