| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- package agent_client
- import (
- "context"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/mq/schema"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
- )
- type PublishSession struct {
- schema *schema.Schema
- partitionCount int
- publisherName string
- stream grpc.BidiStreamingClient[mq_agent_pb.PublishRecordRequest, mq_agent_pb.PublishRecordResponse]
- }
- func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitionCount int, publisherName string) (*PublishSession, error) {
- // call local agent grpc server to create a new session
- clientConn, err := grpc.NewClient(agentAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err)
- }
- agentClient := mq_agent_pb.NewSeaweedMessagingAgentClient(clientConn)
- resp, err := agentClient.StartPublishSession(context.Background(), &mq_agent_pb.StartPublishSessionRequest{
- Topic: &schema_pb.Topic{
- Namespace: topicSchema.Namespace,
- Name: topicSchema.Name,
- },
- PartitionCount: int32(partitionCount),
- RecordType: topicSchema.RecordType,
- PublisherName: publisherName,
- })
- if err != nil {
- return nil, err
- }
- if resp.Error != "" {
- return nil, fmt.Errorf("start publish session: %v", resp.Error)
- }
- stream, err := agentClient.PublishRecord(context.Background())
- if err != nil {
- return nil, fmt.Errorf("publish record: %w", err)
- }
- if err = stream.Send(&mq_agent_pb.PublishRecordRequest{
- SessionId: resp.SessionId,
- }); err != nil {
- return nil, fmt.Errorf("send session id: %w", err)
- }
- return &PublishSession{
- schema: topicSchema,
- partitionCount: partitionCount,
- publisherName: publisherName,
- stream: stream,
- }, nil
- }
- func (a *PublishSession) CloseSession() error {
- if a.schema == nil {
- return nil
- }
- err := a.stream.CloseSend()
- if err != nil {
- return fmt.Errorf("close send: %w", err)
- }
- a.schema = nil
- return err
- }
- func (a *PublishSession) PublishMessageRecord(key []byte, record *schema_pb.RecordValue) error {
- return a.stream.Send(&mq_agent_pb.PublishRecordRequest{
- Key: key,
- Value: record,
- })
- }
|