local_partition_subscribers.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package topic
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. )
  7. type LocalPartitionSubscribers struct {
  8. Subscribers map[string]*LocalSubscriber
  9. SubscribersLock sync.RWMutex
  10. }
  11. type LocalSubscriber struct {
  12. connectTimeNs int64 // accessed atomically
  13. lastSeenTimeNs int64 // accessed atomically
  14. lastReceivedOffset int64 // accessed atomically - offset of last message received
  15. lastAckedOffset int64 // accessed atomically - offset of last message acknowledged
  16. stopCh chan struct{}
  17. }
  18. func NewLocalSubscriber() *LocalSubscriber {
  19. now := time.Now().UnixNano()
  20. subscriber := &LocalSubscriber{
  21. stopCh: make(chan struct{}, 1),
  22. }
  23. atomic.StoreInt64(&subscriber.connectTimeNs, now)
  24. atomic.StoreInt64(&subscriber.lastSeenTimeNs, now)
  25. atomic.StoreInt64(&subscriber.lastReceivedOffset, 0)
  26. atomic.StoreInt64(&subscriber.lastAckedOffset, 0)
  27. return subscriber
  28. }
  29. func (p *LocalSubscriber) SignalShutdown() {
  30. close(p.stopCh)
  31. }
  32. // UpdateLastSeen updates the last activity time for this subscriber
  33. func (p *LocalSubscriber) UpdateLastSeen() {
  34. atomic.StoreInt64(&p.lastSeenTimeNs, time.Now().UnixNano())
  35. }
  36. // UpdateReceivedOffset updates the offset of the last message received by this subscriber
  37. func (p *LocalSubscriber) UpdateReceivedOffset(offset int64) {
  38. atomic.StoreInt64(&p.lastReceivedOffset, offset)
  39. atomic.StoreInt64(&p.lastSeenTimeNs, time.Now().UnixNano())
  40. }
  41. // UpdateAckedOffset updates the offset of the last message acknowledged by this subscriber
  42. func (p *LocalSubscriber) UpdateAckedOffset(offset int64) {
  43. atomic.StoreInt64(&p.lastAckedOffset, offset)
  44. atomic.StoreInt64(&p.lastSeenTimeNs, time.Now().UnixNano())
  45. }
  46. // GetTimestamps returns the connect and last seen timestamps safely
  47. func (p *LocalSubscriber) GetTimestamps() (connectTimeNs, lastSeenTimeNs int64) {
  48. return atomic.LoadInt64(&p.connectTimeNs), atomic.LoadInt64(&p.lastSeenTimeNs)
  49. }
  50. // GetOffsets returns the received and acknowledged offsets safely
  51. func (p *LocalSubscriber) GetOffsets() (lastReceivedOffset, lastAckedOffset int64) {
  52. return atomic.LoadInt64(&p.lastReceivedOffset), atomic.LoadInt64(&p.lastAckedOffset)
  53. }
  54. // GetCurrentOffset returns the acknowledged offset (for compatibility)
  55. func (p *LocalSubscriber) GetCurrentOffset() int64 {
  56. return atomic.LoadInt64(&p.lastAckedOffset)
  57. }
  58. func NewLocalPartitionSubscribers() *LocalPartitionSubscribers {
  59. return &LocalPartitionSubscribers{
  60. Subscribers: make(map[string]*LocalSubscriber),
  61. }
  62. }
  63. func (p *LocalPartitionSubscribers) AddSubscriber(clientName string, Subscriber *LocalSubscriber) {
  64. p.SubscribersLock.Lock()
  65. defer p.SubscribersLock.Unlock()
  66. p.Subscribers[clientName] = Subscriber
  67. }
  68. func (p *LocalPartitionSubscribers) RemoveSubscriber(clientName string) {
  69. p.SubscribersLock.Lock()
  70. defer p.SubscribersLock.Unlock()
  71. delete(p.Subscribers, clientName)
  72. }
  73. func (p *LocalPartitionSubscribers) SignalShutdown() {
  74. p.SubscribersLock.RLock()
  75. defer p.SubscribersLock.RUnlock()
  76. for _, Subscriber := range p.Subscribers {
  77. Subscriber.SignalShutdown()
  78. }
  79. }
  80. func (p *LocalPartitionSubscribers) Size() int {
  81. p.SubscribersLock.RLock()
  82. defer p.SubscribersLock.RUnlock()
  83. return len(p.Subscribers)
  84. }
  85. // GetSubscriberNames returns the names of all subscribers
  86. func (p *LocalPartitionSubscribers) GetSubscriberNames() []string {
  87. p.SubscribersLock.RLock()
  88. defer p.SubscribersLock.RUnlock()
  89. names := make([]string, 0, len(p.Subscribers))
  90. for name := range p.Subscribers {
  91. names = append(names, name)
  92. }
  93. return names
  94. }
  95. // ForEachSubscriber iterates over all subscribers
  96. func (p *LocalPartitionSubscribers) ForEachSubscriber(fn func(name string, subscriber *LocalSubscriber)) {
  97. p.SubscribersLock.RLock()
  98. defer p.SubscribersLock.RUnlock()
  99. for name, subscriber := range p.Subscribers {
  100. fn(name, subscriber)
  101. }
  102. }