local_partition.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. package topic
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  12. "google.golang.org/grpc"
  13. "google.golang.org/grpc/codes"
  14. "google.golang.org/grpc/status"
  15. )
  16. type LocalPartition struct {
  17. ListenersWaits int64
  18. AckTsNs int64
  19. // notifying clients
  20. ListenersLock sync.Mutex
  21. ListenersCond *sync.Cond
  22. Partition
  23. LogBuffer *log_buffer.LogBuffer
  24. Publishers *LocalPartitionPublishers
  25. Subscribers *LocalPartitionSubscribers
  26. publishFolloweMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient
  27. followerGrpcConnection *grpc.ClientConn
  28. Follower string
  29. }
  30. var TIME_FORMAT = "2006-01-02-15-04-05"
  31. var PartitionGenerationFormat = "v2006-01-02-15-04-05"
  32. func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
  33. lp := &LocalPartition{
  34. Partition: partition,
  35. Publishers: NewLocalPartitionPublishers(),
  36. Subscribers: NewLocalPartitionSubscribers(),
  37. }
  38. lp.ListenersCond = sync.NewCond(&lp.ListenersLock)
  39. lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
  40. 2*time.Minute, logFlushFn, readFromDiskFn, func() {
  41. if atomic.LoadInt64(&lp.ListenersWaits) > 0 {
  42. lp.ListenersCond.Broadcast()
  43. }
  44. })
  45. return lp
  46. }
  47. func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
  48. p.LogBuffer.AddToBuffer(message)
  49. // maybe send to the follower
  50. if p.publishFolloweMeStream != nil {
  51. // println("recv", string(message.Key), message.TsNs)
  52. if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
  53. Message: &mq_pb.PublishFollowMeRequest_Data{
  54. Data: message,
  55. },
  56. }); followErr != nil {
  57. return fmt.Errorf("send to follower %s: %v", p.Follower, followErr)
  58. }
  59. } else {
  60. atomic.StoreInt64(&p.AckTsNs, message.TsNs)
  61. }
  62. return nil
  63. }
  64. func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.MessagePosition,
  65. onNoMessageFn func() bool, eachMessageFn log_buffer.EachLogEntryFuncType) error {
  66. var processedPosition log_buffer.MessagePosition
  67. var readPersistedLogErr error
  68. var readInMemoryLogErr error
  69. var isDone bool
  70. for {
  71. processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
  72. if readPersistedLogErr != nil {
  73. glog.V(0).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr)
  74. return readPersistedLogErr
  75. }
  76. if isDone {
  77. return nil
  78. }
  79. if processedPosition.Time.UnixNano() != 0 {
  80. startPosition = processedPosition
  81. }
  82. processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, onNoMessageFn, eachMessageFn)
  83. if isDone {
  84. return nil
  85. }
  86. if processedPosition.Time.UnixNano() != 0 {
  87. startPosition = processedPosition
  88. }
  89. if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
  90. continue
  91. }
  92. if readInMemoryLogErr != nil {
  93. glog.V(0).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr)
  94. return readInMemoryLogErr
  95. }
  96. }
  97. }
  98. func (p *LocalPartition) GetEarliestMessageTimeInMemory() time.Time {
  99. return p.LogBuffer.GetEarliestTime()
  100. }
  101. func (p *LocalPartition) HasData() bool {
  102. return !p.LogBuffer.GetEarliestTime().IsZero()
  103. }
  104. func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.MessagePosition {
  105. return p.LogBuffer.GetEarliestPosition()
  106. }
  107. func (p *LocalPartition) closePublishers() {
  108. p.Publishers.SignalShutdown()
  109. }
  110. func (p *LocalPartition) closeSubscribers() {
  111. p.Subscribers.SignalShutdown()
  112. }
  113. func (p *LocalPartition) WaitUntilNoPublishers() {
  114. for {
  115. if p.Publishers.Size() == 0 {
  116. return
  117. }
  118. time.Sleep(113 * time.Millisecond)
  119. }
  120. }
  121. func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) {
  122. if p.publishFolloweMeStream != nil {
  123. return nil
  124. }
  125. if initMessage.FollowerBroker == "" {
  126. return nil
  127. }
  128. p.Follower = initMessage.FollowerBroker
  129. ctx := context.Background()
  130. p.followerGrpcConnection, err = pb.GrpcDial(ctx, p.Follower, true, grpcDialOption)
  131. if err != nil {
  132. return fmt.Errorf("fail to dial %s: %v", p.Follower, err)
  133. }
  134. followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection)
  135. p.publishFolloweMeStream, err = followerClient.PublishFollowMe(ctx)
  136. if err != nil {
  137. return fmt.Errorf("fail to create publish client: %w", err)
  138. }
  139. if err = p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
  140. Message: &mq_pb.PublishFollowMeRequest_Init{
  141. Init: &mq_pb.PublishFollowMeRequest_InitMessage{
  142. Topic: initMessage.Topic,
  143. Partition: initMessage.Partition,
  144. },
  145. },
  146. }); err != nil {
  147. return err
  148. }
  149. // start receiving ack from follower
  150. go func() {
  151. defer func() {
  152. // println("stop receiving ack from follower")
  153. }()
  154. for {
  155. ack, err := p.publishFolloweMeStream.Recv()
  156. if err != nil {
  157. e, _ := status.FromError(err)
  158. if e.Code() == codes.Canceled {
  159. glog.V(0).Infof("local partition %v follower %v stopped", p.Partition, p.Follower)
  160. return
  161. }
  162. glog.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.Follower, err)
  163. return
  164. }
  165. atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs)
  166. // println("recv ack", ack.AckTsNs)
  167. }
  168. }()
  169. return nil
  170. }
  171. func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
  172. if p.Publishers.Size() == 0 && p.Subscribers.Size() == 0 {
  173. p.LogBuffer.ShutdownLogBuffer()
  174. for !p.LogBuffer.IsAllFlushed() {
  175. time.Sleep(113 * time.Millisecond)
  176. }
  177. if p.publishFolloweMeStream != nil {
  178. // send close to the follower
  179. if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
  180. Message: &mq_pb.PublishFollowMeRequest_Close{
  181. Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
  182. },
  183. }); followErr != nil {
  184. glog.Errorf("Error closing follower stream: %v", followErr)
  185. }
  186. glog.V(4).Infof("closing grpcConnection to follower")
  187. p.followerGrpcConnection.Close()
  188. p.publishFolloweMeStream = nil
  189. p.Follower = ""
  190. }
  191. hasShutdown = true
  192. }
  193. glog.V(0).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.Follower, hasShutdown)
  194. return
  195. }
  196. func (p *LocalPartition) Shutdown() {
  197. p.closePublishers()
  198. p.closeSubscribers()
  199. p.LogBuffer.ShutdownLogBuffer()
  200. glog.V(0).Infof("local partition %v shutting down", p.Partition)
  201. }
  202. func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
  203. if p.publishFolloweMeStream != nil {
  204. if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
  205. Message: &mq_pb.PublishFollowMeRequest_Flush{
  206. Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{
  207. TsNs: flushTsNs,
  208. },
  209. },
  210. }); followErr != nil {
  211. glog.Errorf("send follower %s flush message: %v", p.Follower, followErr)
  212. }
  213. // println("notifying", p.Follower, "flushed at", flushTsNs)
  214. }
  215. }