subscribe_session.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package agent_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  8. "google.golang.org/grpc"
  9. "google.golang.org/grpc/credentials/insecure"
  10. )
  11. type SubscribeOption struct {
  12. ConsumerGroup string
  13. ConsumerGroupInstanceId string
  14. Topic topic.Topic
  15. OffsetType schema_pb.OffsetType
  16. OffsetTsNs int64
  17. Filter string
  18. MaxSubscribedPartitions int32
  19. SlidingWindowSize int32
  20. }
  21. type SubscribeSession struct {
  22. Option *SubscribeOption
  23. stream grpc.BidiStreamingClient[mq_agent_pb.SubscribeRecordRequest, mq_agent_pb.SubscribeRecordResponse]
  24. }
  25. func NewSubscribeSession(agentAddress string, option *SubscribeOption) (*SubscribeSession, error) {
  26. // call local agent grpc server to create a new session
  27. clientConn, err := grpc.NewClient(agentAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
  28. if err != nil {
  29. return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err)
  30. }
  31. agentClient := mq_agent_pb.NewSeaweedMessagingAgentClient(clientConn)
  32. initRequest := &mq_agent_pb.SubscribeRecordRequest_InitSubscribeRecordRequest{
  33. ConsumerGroup: option.ConsumerGroup,
  34. ConsumerGroupInstanceId: option.ConsumerGroupInstanceId,
  35. Topic: &schema_pb.Topic{
  36. Namespace: option.Topic.Namespace,
  37. Name: option.Topic.Name,
  38. },
  39. OffsetType: option.OffsetType,
  40. OffsetTsNs: option.OffsetTsNs,
  41. MaxSubscribedPartitions: option.MaxSubscribedPartitions,
  42. Filter: option.Filter,
  43. SlidingWindowSize: option.SlidingWindowSize,
  44. }
  45. stream, err := agentClient.SubscribeRecord(context.Background())
  46. if err != nil {
  47. return nil, fmt.Errorf("subscribe record: %w", err)
  48. }
  49. if err = stream.Send(&mq_agent_pb.SubscribeRecordRequest{
  50. Init: initRequest,
  51. }); err != nil {
  52. return nil, fmt.Errorf("send session id: %w", err)
  53. }
  54. return &SubscribeSession{
  55. Option: option,
  56. stream: stream,
  57. }, nil
  58. }
  59. func (s *SubscribeSession) CloseSession() error {
  60. err := s.stream.CloseSend()
  61. return err
  62. }
  63. func (a *SubscribeSession) SubscribeMessageRecord(
  64. onEachMessageFn func(key []byte, record *schema_pb.RecordValue),
  65. onCompletionFn func()) error {
  66. for {
  67. resp, err := a.stream.Recv()
  68. if err != nil {
  69. return err
  70. }
  71. onEachMessageFn(resp.Key, resp.Value)
  72. }
  73. if onCompletionFn != nil {
  74. onCompletionFn()
  75. }
  76. return nil
  77. }