tail_volume.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package operation
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "google.golang.org/grpc"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  10. )
  11. func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
  12. // find volume location, replication, ttl info
  13. lookup, err := LookupVolumeId(masterFn, grpcDialOption, vid.String())
  14. if err != nil {
  15. return fmt.Errorf("look up volume %d: %v", vid, err)
  16. }
  17. if len(lookup.Locations) == 0 {
  18. return fmt.Errorf("unable to locate volume %d", vid)
  19. }
  20. volumeServer := lookup.Locations[0].ServerAddress()
  21. return TailVolumeFromSource(volumeServer, grpcDialOption, vid, sinceNs, timeoutSeconds, fn)
  22. }
  23. func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
  24. return WithVolumeServerClient(true, volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  25. ctx, cancel := context.WithCancel(context.Background())
  26. defer cancel()
  27. stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
  28. VolumeId: uint32(vid),
  29. SinceNs: sinceNs,
  30. IdleTimeoutSeconds: uint32(idleTimeoutSeconds),
  31. })
  32. if err != nil {
  33. return err
  34. }
  35. for {
  36. resp, recvErr := stream.Recv()
  37. if recvErr != nil {
  38. if recvErr == io.EOF {
  39. break
  40. } else {
  41. return recvErr
  42. }
  43. }
  44. needleHeader := resp.NeedleHeader
  45. needleBody := resp.NeedleBody
  46. version := needle.Version(resp.Version)
  47. if version == 0 {
  48. version = needle.GetCurrentVersion()
  49. }
  50. if len(needleHeader) == 0 {
  51. continue
  52. }
  53. for !resp.IsLastChunk {
  54. resp, recvErr = stream.Recv()
  55. if recvErr != nil {
  56. if recvErr == io.EOF {
  57. break
  58. } else {
  59. return recvErr
  60. }
  61. }
  62. needleBody = append(needleBody, resp.NeedleBody...)
  63. }
  64. n := new(needle.Needle)
  65. n.ParseNeedleHeader(needleHeader)
  66. err = n.ReadNeedleBodyBytes(needleBody, version)
  67. if err != nil {
  68. return err
  69. }
  70. err = fn(n)
  71. if err != nil {
  72. return err
  73. }
  74. }
  75. return nil
  76. })
  77. }