webhook_queue.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package webhook
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "time"
  7. "github.com/ThreeDotsLabs/watermill"
  8. "github.com/ThreeDotsLabs/watermill/message"
  9. "github.com/ThreeDotsLabs/watermill/message/router/middleware"
  10. "github.com/ThreeDotsLabs/watermill/message/router/plugin"
  11. "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/notification"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. "google.golang.org/protobuf/proto"
  17. )
  18. func init() {
  19. notification.MessageQueues = append(notification.MessageQueues, &Queue{})
  20. }
  21. type Queue struct {
  22. router *message.Router
  23. queueChannel *gochannel.GoChannel
  24. config *config
  25. client client
  26. filter *filter
  27. ctx context.Context
  28. cancel context.CancelFunc
  29. }
  30. func (w *Queue) GetName() string {
  31. return queueName
  32. }
  33. func (w *Queue) SendMessage(key string, msg proto.Message) error {
  34. eventNotification, ok := msg.(*filer_pb.EventNotification)
  35. if !ok {
  36. return nil
  37. }
  38. if w.filter != nil && !w.filter.shouldPublish(key, eventNotification) {
  39. return nil
  40. }
  41. m := newWebhookMessage(key, msg)
  42. if m == nil {
  43. return nil
  44. }
  45. wMsg, err := m.toWaterMillMessage()
  46. if err != nil {
  47. return err
  48. }
  49. return w.queueChannel.Publish(pubSubTopicName, wMsg)
  50. }
  51. func (w *webhookMessage) toWaterMillMessage() (*message.Message, error) {
  52. payload, err := proto.Marshal(w.Notification)
  53. if err != nil {
  54. return nil, err
  55. }
  56. msg := message.NewMessage(watermill.NewUUID(), payload)
  57. // Set event type and key as metadata
  58. msg.Metadata.Set("event_type", w.EventType)
  59. msg.Metadata.Set("key", w.Key)
  60. return msg, nil
  61. }
  62. func (w *Queue) Initialize(configuration util.Configuration, prefix string) error {
  63. c := newConfigWithDefaults(configuration, prefix)
  64. if err := c.validate(); err != nil {
  65. return err
  66. }
  67. return w.initialize(c)
  68. }
  69. func (w *Queue) initialize(cfg *config) error {
  70. w.ctx, w.cancel = context.WithCancel(context.Background())
  71. w.config = cfg
  72. w.filter = newFilter(cfg)
  73. hClient, err := newHTTPClient(cfg)
  74. if err != nil {
  75. return fmt.Errorf("failed to create webhook http client: %w", err)
  76. }
  77. w.client = hClient
  78. if err = w.setupWatermillQueue(cfg); err != nil {
  79. return fmt.Errorf("failed to setup watermill queue: %w", err)
  80. }
  81. if err = w.logDeadLetterMessages(); err != nil {
  82. return err
  83. }
  84. return nil
  85. }
  86. func (w *Queue) setupWatermillQueue(cfg *config) error {
  87. logger := watermill.NewStdLogger(false, false)
  88. pubSubConfig := gochannel.Config{
  89. OutputChannelBuffer: int64(cfg.bufferSize),
  90. Persistent: false,
  91. }
  92. w.queueChannel = gochannel.NewGoChannel(pubSubConfig, logger)
  93. router, err := message.NewRouter(
  94. message.RouterConfig{
  95. CloseTimeout: 60 * time.Second,
  96. },
  97. logger,
  98. )
  99. if err != nil {
  100. return fmt.Errorf("failed to create router: %w", err)
  101. }
  102. w.router = router
  103. retryMiddleware := middleware.Retry{
  104. MaxRetries: cfg.maxRetries,
  105. InitialInterval: time.Duration(cfg.backoffSeconds) * time.Second,
  106. MaxInterval: time.Duration(cfg.maxBackoffSeconds) * time.Second,
  107. Multiplier: 2.0,
  108. RandomizationFactor: 0.3,
  109. Logger: logger,
  110. }.Middleware
  111. poisonQueue, err := middleware.PoisonQueue(w.queueChannel, deadLetterTopic)
  112. if err != nil {
  113. return fmt.Errorf("failed to create poison queue: %w", err)
  114. }
  115. router.AddPlugin(plugin.SignalsHandler)
  116. router.AddMiddleware(retryMiddleware, poisonQueue)
  117. for i := 0; i < cfg.nWorkers; i++ {
  118. router.AddNoPublisherHandler(
  119. pubSubHandlerNameTemplate(i),
  120. pubSubTopicName,
  121. w.queueChannel,
  122. w.handleWebhook,
  123. )
  124. }
  125. go func() {
  126. // cancels the queue context so the dead letter logger exists in case context not canceled by the shutdown signal already
  127. defer w.cancel()
  128. if err := router.Run(w.ctx); err != nil && !errors.Is(err, context.Canceled) {
  129. glog.Errorf("webhook pubsub worker stopped with error: %v", err)
  130. }
  131. glog.Info("webhook pubsub worker stopped")
  132. }()
  133. return nil
  134. }
  135. func (w *Queue) handleWebhook(msg *message.Message) error {
  136. var n filer_pb.EventNotification
  137. if err := proto.Unmarshal(msg.Payload, &n); err != nil {
  138. glog.Errorf("failed to unmarshal protobuf message: %v", err)
  139. return err
  140. }
  141. // Reconstruct webhook message from metadata and payload
  142. webhookMsg := &webhookMessage{
  143. Key: msg.Metadata.Get("key"),
  144. EventType: msg.Metadata.Get("event_type"),
  145. Notification: &n,
  146. }
  147. if err := w.client.sendMessage(webhookMsg); err != nil {
  148. glog.Errorf("failed to send message to webhook %s: %v", webhookMsg.Key, err)
  149. return err
  150. }
  151. return nil
  152. }
  153. func (w *Queue) logDeadLetterMessages() error {
  154. ch, err := w.queueChannel.Subscribe(w.ctx, deadLetterTopic)
  155. if err != nil {
  156. return err
  157. }
  158. go func() {
  159. for {
  160. select {
  161. case msg, ok := <-ch:
  162. if !ok {
  163. glog.Info("dead letter channel closed")
  164. return
  165. }
  166. if msg == nil {
  167. glog.Errorf("received nil message from dead letter channel")
  168. continue
  169. }
  170. key := "unknown"
  171. if msg.Metadata != nil {
  172. if keyValue, exists := msg.Metadata["key"]; exists {
  173. key = keyValue
  174. }
  175. }
  176. payload := ""
  177. if msg.Payload != nil {
  178. var n filer_pb.EventNotification
  179. if err := proto.Unmarshal(msg.Payload, &n); err != nil {
  180. payload = fmt.Sprintf("failed to unmarshal payload: %v", err)
  181. } else {
  182. payload = n.String()
  183. }
  184. }
  185. glog.Errorf("received dead letter message: %s, key: %s", payload, key)
  186. case <-w.ctx.Done():
  187. return
  188. }
  189. }
  190. }()
  191. return nil
  192. }