broker_client.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603
  1. package engine
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/cluster"
  11. "github.com/seaweedfs/seaweedfs/weed/filer"
  12. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  13. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. "google.golang.org/grpc"
  20. "google.golang.org/grpc/credentials/insecure"
  21. jsonpb "google.golang.org/protobuf/encoding/protojson"
  22. )
  23. // BrokerClient handles communication with SeaweedFS MQ broker
  24. // Implements BrokerClientInterface for production use
  25. // Assumptions:
  26. // 1. Service discovery via master server (discovers filers and brokers)
  27. // 2. gRPC connection with default timeout of 30 seconds
  28. // 3. Topics and namespaces are managed via SeaweedMessaging service
  29. type BrokerClient struct {
  30. masterAddress string
  31. filerAddress string
  32. brokerAddress string
  33. grpcDialOption grpc.DialOption
  34. }
  35. // NewBrokerClient creates a new MQ broker client
  36. // Uses master HTTP address and converts it to gRPC address for service discovery
  37. func NewBrokerClient(masterHTTPAddress string) *BrokerClient {
  38. // Convert HTTP address to gRPC address (typically HTTP port + 10000)
  39. masterGRPCAddress := convertHTTPToGRPC(masterHTTPAddress)
  40. return &BrokerClient{
  41. masterAddress: masterGRPCAddress,
  42. grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
  43. }
  44. }
  45. // convertHTTPToGRPC converts HTTP address to gRPC address
  46. // Follows SeaweedFS convention: gRPC port = HTTP port + 10000
  47. func convertHTTPToGRPC(httpAddress string) string {
  48. if strings.Contains(httpAddress, ":") {
  49. parts := strings.Split(httpAddress, ":")
  50. if len(parts) == 2 {
  51. if port, err := strconv.Atoi(parts[1]); err == nil {
  52. return fmt.Sprintf("%s:%d", parts[0], port+10000)
  53. }
  54. }
  55. }
  56. // Fallback: return original address if conversion fails
  57. return httpAddress
  58. }
  59. // discoverFiler finds a filer from the master server
  60. func (c *BrokerClient) discoverFiler() error {
  61. if c.filerAddress != "" {
  62. return nil // already discovered
  63. }
  64. conn, err := grpc.Dial(c.masterAddress, c.grpcDialOption)
  65. if err != nil {
  66. return fmt.Errorf("failed to connect to master at %s: %v", c.masterAddress, err)
  67. }
  68. defer conn.Close()
  69. client := master_pb.NewSeaweedClient(conn)
  70. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  71. defer cancel()
  72. resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{
  73. ClientType: cluster.FilerType,
  74. })
  75. if err != nil {
  76. return fmt.Errorf("failed to list filers from master: %v", err)
  77. }
  78. if len(resp.ClusterNodes) == 0 {
  79. return fmt.Errorf("no filers found in cluster")
  80. }
  81. // Use the first available filer and convert HTTP address to gRPC
  82. filerHTTPAddress := resp.ClusterNodes[0].Address
  83. c.filerAddress = convertHTTPToGRPC(filerHTTPAddress)
  84. return nil
  85. }
  86. // findBrokerBalancer discovers the broker balancer using filer lock mechanism
  87. // First discovers filer from master, then uses filer to find broker balancer
  88. func (c *BrokerClient) findBrokerBalancer() error {
  89. if c.brokerAddress != "" {
  90. return nil // already found
  91. }
  92. // First discover filer from master
  93. if err := c.discoverFiler(); err != nil {
  94. return fmt.Errorf("failed to discover filer: %v", err)
  95. }
  96. conn, err := grpc.Dial(c.filerAddress, c.grpcDialOption)
  97. if err != nil {
  98. return fmt.Errorf("failed to connect to filer at %s: %v", c.filerAddress, err)
  99. }
  100. defer conn.Close()
  101. client := filer_pb.NewSeaweedFilerClient(conn)
  102. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  103. defer cancel()
  104. resp, err := client.FindLockOwner(ctx, &filer_pb.FindLockOwnerRequest{
  105. Name: pub_balancer.LockBrokerBalancer,
  106. })
  107. if err != nil {
  108. return fmt.Errorf("failed to find broker balancer: %v", err)
  109. }
  110. c.brokerAddress = resp.Owner
  111. return nil
  112. }
  113. // GetFilerClient creates a filer client for accessing MQ data files
  114. // Discovers filer from master if not already known
  115. func (c *BrokerClient) GetFilerClient() (filer_pb.FilerClient, error) {
  116. // Ensure filer is discovered
  117. if err := c.discoverFiler(); err != nil {
  118. return nil, fmt.Errorf("failed to discover filer: %v", err)
  119. }
  120. return &filerClientImpl{
  121. filerAddress: c.filerAddress,
  122. grpcDialOption: c.grpcDialOption,
  123. }, nil
  124. }
  125. // filerClientImpl implements filer_pb.FilerClient interface for MQ data access
  126. type filerClientImpl struct {
  127. filerAddress string
  128. grpcDialOption grpc.DialOption
  129. }
  130. // WithFilerClient executes a function with a connected filer client
  131. func (f *filerClientImpl) WithFilerClient(followRedirect bool, fn func(client filer_pb.SeaweedFilerClient) error) error {
  132. conn, err := grpc.Dial(f.filerAddress, f.grpcDialOption)
  133. if err != nil {
  134. return fmt.Errorf("failed to connect to filer at %s: %v", f.filerAddress, err)
  135. }
  136. defer conn.Close()
  137. client := filer_pb.NewSeaweedFilerClient(conn)
  138. return fn(client)
  139. }
  140. // AdjustedUrl implements the FilerClient interface (placeholder implementation)
  141. func (f *filerClientImpl) AdjustedUrl(location *filer_pb.Location) string {
  142. return location.Url
  143. }
  144. // GetDataCenter implements the FilerClient interface (placeholder implementation)
  145. func (f *filerClientImpl) GetDataCenter() string {
  146. // Return empty string as we don't have data center information for this simple client
  147. return ""
  148. }
  149. // ListNamespaces retrieves all MQ namespaces (databases) from the filer
  150. // RESOLVED: Now queries actual topic directories instead of hardcoded values
  151. func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) {
  152. // Get filer client to list directories under /topics
  153. filerClient, err := c.GetFilerClient()
  154. if err != nil {
  155. return []string{}, fmt.Errorf("failed to get filer client: %v", err)
  156. }
  157. var namespaces []string
  158. err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  159. // List directories under /topics to get namespaces
  160. request := &filer_pb.ListEntriesRequest{
  161. Directory: "/topics", // filer.TopicsDir constant value
  162. }
  163. stream, streamErr := client.ListEntries(ctx, request)
  164. if streamErr != nil {
  165. return fmt.Errorf("failed to list topics directory: %v", streamErr)
  166. }
  167. for {
  168. resp, recvErr := stream.Recv()
  169. if recvErr != nil {
  170. if recvErr == io.EOF {
  171. break // End of stream
  172. }
  173. return fmt.Errorf("failed to receive entry: %v", recvErr)
  174. }
  175. // Only include directories (namespaces), skip files
  176. if resp.Entry != nil && resp.Entry.IsDirectory {
  177. namespaces = append(namespaces, resp.Entry.Name)
  178. }
  179. }
  180. return nil
  181. })
  182. if err != nil {
  183. return []string{}, fmt.Errorf("failed to list namespaces from /topics: %v", err)
  184. }
  185. // Return actual namespaces found (may be empty if no topics exist)
  186. return namespaces, nil
  187. }
  188. // ListTopics retrieves all topics in a namespace from the filer
  189. // RESOLVED: Now queries actual topic directories instead of hardcoded values
  190. func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]string, error) {
  191. // Get filer client to list directories under /topics/{namespace}
  192. filerClient, err := c.GetFilerClient()
  193. if err != nil {
  194. // Return empty list if filer unavailable - no fallback sample data
  195. return []string{}, nil
  196. }
  197. var topics []string
  198. err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  199. // List directories under /topics/{namespace} to get topics
  200. namespaceDir := fmt.Sprintf("/topics/%s", namespace)
  201. request := &filer_pb.ListEntriesRequest{
  202. Directory: namespaceDir,
  203. }
  204. stream, streamErr := client.ListEntries(ctx, request)
  205. if streamErr != nil {
  206. return fmt.Errorf("failed to list namespace directory %s: %v", namespaceDir, streamErr)
  207. }
  208. for {
  209. resp, recvErr := stream.Recv()
  210. if recvErr != nil {
  211. if recvErr == io.EOF {
  212. break // End of stream
  213. }
  214. return fmt.Errorf("failed to receive entry: %v", recvErr)
  215. }
  216. // Only include directories (topics), skip files
  217. if resp.Entry != nil && resp.Entry.IsDirectory {
  218. topics = append(topics, resp.Entry.Name)
  219. }
  220. }
  221. return nil
  222. })
  223. if err != nil {
  224. // Return empty list if directory listing fails - no fallback sample data
  225. return []string{}, nil
  226. }
  227. // Return actual topics found (may be empty if no topics exist in namespace)
  228. return topics, nil
  229. }
  230. // GetTopicSchema retrieves schema information for a specific topic
  231. // Reads the actual schema from topic configuration stored in filer
  232. func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName string) (*schema_pb.RecordType, error) {
  233. // Get filer client to read topic configuration
  234. filerClient, err := c.GetFilerClient()
  235. if err != nil {
  236. return nil, fmt.Errorf("failed to get filer client: %v", err)
  237. }
  238. var recordType *schema_pb.RecordType
  239. err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  240. // Read topic.conf file from /topics/{namespace}/{topic}/topic.conf
  241. topicDir := fmt.Sprintf("/topics/%s/%s", namespace, topicName)
  242. // First check if topic directory exists
  243. _, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
  244. Directory: topicDir,
  245. Name: "topic.conf",
  246. })
  247. if err != nil {
  248. return fmt.Errorf("topic %s.%s not found: %v", namespace, topicName, err)
  249. }
  250. // Read the topic.conf file content
  251. data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf")
  252. if err != nil {
  253. return fmt.Errorf("failed to read topic.conf for %s.%s: %v", namespace, topicName, err)
  254. }
  255. // Parse the configuration
  256. conf := &mq_pb.ConfigureTopicResponse{}
  257. if err = jsonpb.Unmarshal(data, conf); err != nil {
  258. return fmt.Errorf("failed to unmarshal topic %s.%s configuration: %v", namespace, topicName, err)
  259. }
  260. // Extract the record type (schema)
  261. if conf.RecordType != nil {
  262. recordType = conf.RecordType
  263. } else {
  264. return fmt.Errorf("no schema found for topic %s.%s", namespace, topicName)
  265. }
  266. return nil
  267. })
  268. if err != nil {
  269. return nil, err
  270. }
  271. if recordType == nil {
  272. return nil, fmt.Errorf("no record type found for topic %s.%s", namespace, topicName)
  273. }
  274. return recordType, nil
  275. }
  276. // ConfigureTopic creates or modifies a topic configuration
  277. // Assumption: Uses existing ConfigureTopic gRPC method for topic management
  278. func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error {
  279. if err := c.findBrokerBalancer(); err != nil {
  280. return err
  281. }
  282. conn, err := grpc.Dial(c.brokerAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
  283. if err != nil {
  284. return fmt.Errorf("failed to connect to broker at %s: %v", c.brokerAddress, err)
  285. }
  286. defer conn.Close()
  287. client := mq_pb.NewSeaweedMessagingClient(conn)
  288. // Create topic configuration
  289. _, err = client.ConfigureTopic(ctx, &mq_pb.ConfigureTopicRequest{
  290. Topic: &schema_pb.Topic{
  291. Namespace: namespace,
  292. Name: topicName,
  293. },
  294. PartitionCount: partitionCount,
  295. RecordType: recordType,
  296. })
  297. if err != nil {
  298. return fmt.Errorf("failed to configure topic %s.%s: %v", namespace, topicName, err)
  299. }
  300. return nil
  301. }
  302. // DeleteTopic removes a topic and all its data
  303. // Assumption: There's a delete/drop topic method (may need to be implemented in broker)
  304. func (c *BrokerClient) DeleteTopic(ctx context.Context, namespace, topicName string) error {
  305. if err := c.findBrokerBalancer(); err != nil {
  306. return err
  307. }
  308. // TODO: Implement topic deletion
  309. // This may require a new gRPC method in the broker service
  310. return fmt.Errorf("topic deletion not yet implemented in broker - need to add DeleteTopic gRPC method")
  311. }
  312. // ListTopicPartitions discovers the actual partitions for a given topic via MQ broker
  313. func (c *BrokerClient) ListTopicPartitions(ctx context.Context, namespace, topicName string) ([]topic.Partition, error) {
  314. if err := c.findBrokerBalancer(); err != nil {
  315. // Fallback to default partition when broker unavailable
  316. return []topic.Partition{{RangeStart: 0, RangeStop: 1000}}, nil
  317. }
  318. // Get topic configuration to determine actual partitions
  319. topicObj := topic.Topic{Namespace: namespace, Name: topicName}
  320. // Use filer client to read topic configuration
  321. filerClient, err := c.GetFilerClient()
  322. if err != nil {
  323. // Fallback to default partition
  324. return []topic.Partition{{RangeStart: 0, RangeStop: 1000}}, nil
  325. }
  326. var topicConf *mq_pb.ConfigureTopicResponse
  327. err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  328. topicConf, err = topicObj.ReadConfFile(client)
  329. return err
  330. })
  331. if err != nil {
  332. // Topic doesn't exist or can't read config, use default
  333. return []topic.Partition{{RangeStart: 0, RangeStop: 1000}}, nil
  334. }
  335. // Generate partitions based on topic configuration
  336. partitionCount := int32(4) // Default partition count for topics
  337. if len(topicConf.BrokerPartitionAssignments) > 0 {
  338. partitionCount = int32(len(topicConf.BrokerPartitionAssignments))
  339. }
  340. // Create partition ranges - simplified approach
  341. // Each partition covers an equal range of the hash space
  342. rangeSize := topic.PartitionCount / partitionCount
  343. var partitions []topic.Partition
  344. for i := int32(0); i < partitionCount; i++ {
  345. rangeStart := i * rangeSize
  346. rangeStop := (i + 1) * rangeSize
  347. if i == partitionCount-1 {
  348. // Last partition covers remaining range
  349. rangeStop = topic.PartitionCount
  350. }
  351. partitions = append(partitions, topic.Partition{
  352. RangeStart: rangeStart,
  353. RangeStop: rangeStop,
  354. RingSize: topic.PartitionCount,
  355. UnixTimeNs: time.Now().UnixNano(),
  356. })
  357. }
  358. return partitions, nil
  359. }
  360. // GetUnflushedMessages returns only messages that haven't been flushed to disk yet
  361. // Uses buffer_start metadata from disk files for precise deduplication
  362. // This prevents double-counting when combining with disk-based data
  363. func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) {
  364. // Step 1: Find the broker that hosts this partition
  365. if err := c.findBrokerBalancer(); err != nil {
  366. // Return empty slice if we can't find broker - prevents double-counting
  367. return []*filer_pb.LogEntry{}, nil
  368. }
  369. // Step 2: Connect to broker
  370. conn, err := grpc.Dial(c.brokerAddress, c.grpcDialOption)
  371. if err != nil {
  372. // Return empty slice if connection fails - prevents double-counting
  373. return []*filer_pb.LogEntry{}, nil
  374. }
  375. defer conn.Close()
  376. client := mq_pb.NewSeaweedMessagingClient(conn)
  377. // Step 3: Get earliest buffer_start from disk files for precise deduplication
  378. topicObj := topic.Topic{Namespace: namespace, Name: topicName}
  379. partitionPath := topic.PartitionDir(topicObj, partition)
  380. earliestBufferIndex, err := c.getEarliestBufferStart(ctx, partitionPath)
  381. if err != nil {
  382. // If we can't get buffer info, use 0 (get all unflushed data)
  383. earliestBufferIndex = 0
  384. }
  385. // Step 4: Prepare request using buffer index filtering only
  386. request := &mq_pb.GetUnflushedMessagesRequest{
  387. Topic: &schema_pb.Topic{
  388. Namespace: namespace,
  389. Name: topicName,
  390. },
  391. Partition: &schema_pb.Partition{
  392. RingSize: partition.RingSize,
  393. RangeStart: partition.RangeStart,
  394. RangeStop: partition.RangeStop,
  395. UnixTimeNs: partition.UnixTimeNs,
  396. },
  397. StartBufferIndex: earliestBufferIndex,
  398. }
  399. // Step 5: Call the broker streaming API
  400. stream, err := client.GetUnflushedMessages(ctx, request)
  401. if err != nil {
  402. // Return empty slice if gRPC call fails - prevents double-counting
  403. return []*filer_pb.LogEntry{}, nil
  404. }
  405. // Step 5: Receive streaming responses
  406. var logEntries []*filer_pb.LogEntry
  407. for {
  408. response, err := stream.Recv()
  409. if err != nil {
  410. // End of stream or error - return what we have to prevent double-counting
  411. break
  412. }
  413. // Handle error messages
  414. if response.Error != "" {
  415. // Log the error but return empty slice - prevents double-counting
  416. // (In debug mode, this would be visible)
  417. return []*filer_pb.LogEntry{}, nil
  418. }
  419. // Check for end of stream
  420. if response.EndOfStream {
  421. break
  422. }
  423. // Convert and collect the message
  424. if response.Message != nil {
  425. logEntries = append(logEntries, &filer_pb.LogEntry{
  426. TsNs: response.Message.TsNs,
  427. Key: response.Message.Key,
  428. Data: response.Message.Data,
  429. PartitionKeyHash: int32(response.Message.PartitionKeyHash), // Convert uint32 to int32
  430. })
  431. }
  432. }
  433. return logEntries, nil
  434. }
  435. // getEarliestBufferStart finds the earliest buffer_start index from disk files in the partition
  436. //
  437. // This method handles three scenarios for seamless broker querying:
  438. // 1. Live log files exist: Uses their buffer_start metadata (most recent boundaries)
  439. // 2. Only Parquet files exist: Uses Parquet buffer_start metadata (preserved from archived sources)
  440. // 3. Mixed files: Uses earliest buffer_start from all sources for comprehensive coverage
  441. //
  442. // This ensures continuous real-time querying capability even after log file compaction/archival
  443. func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath string) (int64, error) {
  444. filerClient, err := c.GetFilerClient()
  445. if err != nil {
  446. return 0, fmt.Errorf("failed to get filer client: %v", err)
  447. }
  448. var earliestBufferIndex int64 = -1 // -1 means no buffer_start found
  449. var logFileCount, parquetFileCount int
  450. var bufferStartSources []string // Track which files provide buffer_start
  451. err = filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
  452. // Skip directories
  453. if entry.IsDirectory {
  454. return nil
  455. }
  456. // Count file types for scenario detection
  457. if strings.HasSuffix(entry.Name, ".parquet") {
  458. parquetFileCount++
  459. } else {
  460. logFileCount++
  461. }
  462. // Extract buffer_start from file extended attributes (both log files and parquet files)
  463. bufferStart := c.getBufferStartFromEntry(entry)
  464. if bufferStart != nil && bufferStart.StartIndex > 0 {
  465. if earliestBufferIndex == -1 || bufferStart.StartIndex < earliestBufferIndex {
  466. earliestBufferIndex = bufferStart.StartIndex
  467. }
  468. bufferStartSources = append(bufferStartSources, entry.Name)
  469. }
  470. return nil
  471. })
  472. // Debug: Show buffer_start determination logic in EXPLAIN mode
  473. if isDebugMode(ctx) && len(bufferStartSources) > 0 {
  474. if logFileCount == 0 && parquetFileCount > 0 {
  475. fmt.Printf("Debug: Using Parquet buffer_start metadata (binary format, no log files) - sources: %v\n", bufferStartSources)
  476. } else if logFileCount > 0 && parquetFileCount > 0 {
  477. fmt.Printf("Debug: Using mixed sources for buffer_start (binary format) - log files: %d, Parquet files: %d, sources: %v\n",
  478. logFileCount, parquetFileCount, bufferStartSources)
  479. } else {
  480. fmt.Printf("Debug: Using log file buffer_start metadata (binary format) - sources: %v\n", bufferStartSources)
  481. }
  482. fmt.Printf("Debug: Earliest buffer_start index: %d\n", earliestBufferIndex)
  483. }
  484. if err != nil {
  485. return 0, fmt.Errorf("failed to scan partition directory: %v", err)
  486. }
  487. if earliestBufferIndex == -1 {
  488. return 0, fmt.Errorf("no buffer_start metadata found in partition")
  489. }
  490. return earliestBufferIndex, nil
  491. }
  492. // getBufferStartFromEntry extracts LogBufferStart from file entry metadata
  493. // Only supports binary format (used by both log files and Parquet files)
  494. func (c *BrokerClient) getBufferStartFromEntry(entry *filer_pb.Entry) *LogBufferStart {
  495. if entry.Extended == nil {
  496. return nil
  497. }
  498. if startData, exists := entry.Extended["buffer_start"]; exists {
  499. // Only support binary format
  500. if len(startData) == 8 {
  501. startIndex := int64(binary.BigEndian.Uint64(startData))
  502. if startIndex > 0 {
  503. return &LogBufferStart{StartIndex: startIndex}
  504. }
  505. }
  506. }
  507. return nil
  508. }