| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- package webhook
- import (
- "context"
- "errors"
- "fmt"
- "time"
- "github.com/ThreeDotsLabs/watermill"
- "github.com/ThreeDotsLabs/watermill/message"
- "github.com/ThreeDotsLabs/watermill/message/router/middleware"
- "github.com/ThreeDotsLabs/watermill/message/router/plugin"
- "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/notification"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
- "google.golang.org/protobuf/proto"
- )
- func init() {
- notification.MessageQueues = append(notification.MessageQueues, &Queue{})
- }
- type Queue struct {
- router *message.Router
- queueChannel *gochannel.GoChannel
- config *config
- client client
- filter *filter
- ctx context.Context
- cancel context.CancelFunc
- }
- func (w *Queue) GetName() string {
- return queueName
- }
- func (w *Queue) SendMessage(key string, msg proto.Message) error {
- eventNotification, ok := msg.(*filer_pb.EventNotification)
- if !ok {
- return nil
- }
- if w.filter != nil && !w.filter.shouldPublish(key, eventNotification) {
- return nil
- }
- m := newWebhookMessage(key, msg)
- if m == nil {
- return nil
- }
- wMsg, err := m.toWaterMillMessage()
- if err != nil {
- return err
- }
- return w.queueChannel.Publish(pubSubTopicName, wMsg)
- }
- func (w *webhookMessage) toWaterMillMessage() (*message.Message, error) {
- payload, err := proto.Marshal(w.Notification)
- if err != nil {
- return nil, err
- }
- msg := message.NewMessage(watermill.NewUUID(), payload)
- // Set event type and key as metadata
- msg.Metadata.Set("event_type", w.EventType)
- msg.Metadata.Set("key", w.Key)
- return msg, nil
- }
- func (w *Queue) Initialize(configuration util.Configuration, prefix string) error {
- c := newConfigWithDefaults(configuration, prefix)
- if err := c.validate(); err != nil {
- return err
- }
- return w.initialize(c)
- }
- func (w *Queue) initialize(cfg *config) error {
- w.ctx, w.cancel = context.WithCancel(context.Background())
- w.config = cfg
- w.filter = newFilter(cfg)
- hClient, err := newHTTPClient(cfg)
- if err != nil {
- return fmt.Errorf("failed to create webhook http client: %w", err)
- }
- w.client = hClient
- if err = w.setupWatermillQueue(cfg); err != nil {
- return fmt.Errorf("failed to setup watermill queue: %w", err)
- }
- if err = w.logDeadLetterMessages(); err != nil {
- return err
- }
- return nil
- }
- func (w *Queue) setupWatermillQueue(cfg *config) error {
- logger := watermill.NewStdLogger(false, false)
- pubSubConfig := gochannel.Config{
- OutputChannelBuffer: int64(cfg.bufferSize),
- Persistent: false,
- }
- w.queueChannel = gochannel.NewGoChannel(pubSubConfig, logger)
- router, err := message.NewRouter(
- message.RouterConfig{
- CloseTimeout: 60 * time.Second,
- },
- logger,
- )
- if err != nil {
- return fmt.Errorf("failed to create router: %w", err)
- }
- w.router = router
- retryMiddleware := middleware.Retry{
- MaxRetries: cfg.maxRetries,
- InitialInterval: time.Duration(cfg.backoffSeconds) * time.Second,
- MaxInterval: time.Duration(cfg.maxBackoffSeconds) * time.Second,
- Multiplier: 2.0,
- RandomizationFactor: 0.3,
- Logger: logger,
- }.Middleware
- poisonQueue, err := middleware.PoisonQueue(w.queueChannel, deadLetterTopic)
- if err != nil {
- return fmt.Errorf("failed to create poison queue: %w", err)
- }
- router.AddPlugin(plugin.SignalsHandler)
- router.AddMiddleware(retryMiddleware, poisonQueue)
- for i := 0; i < cfg.nWorkers; i++ {
- router.AddNoPublisherHandler(
- pubSubHandlerNameTemplate(i),
- pubSubTopicName,
- w.queueChannel,
- w.handleWebhook,
- )
- }
- go func() {
- // cancels the queue context so the dead letter logger exists in case context not canceled by the shutdown signal already
- defer w.cancel()
- if err := router.Run(w.ctx); err != nil && !errors.Is(err, context.Canceled) {
- glog.Errorf("webhook pubsub worker stopped with error: %v", err)
- }
- glog.Info("webhook pubsub worker stopped")
- }()
- return nil
- }
- func (w *Queue) handleWebhook(msg *message.Message) error {
- var n filer_pb.EventNotification
- if err := proto.Unmarshal(msg.Payload, &n); err != nil {
- glog.Errorf("failed to unmarshal protobuf message: %v", err)
- return err
- }
- // Reconstruct webhook message from metadata and payload
- webhookMsg := &webhookMessage{
- Key: msg.Metadata.Get("key"),
- EventType: msg.Metadata.Get("event_type"),
- Notification: &n,
- }
- if err := w.client.sendMessage(webhookMsg); err != nil {
- glog.Errorf("failed to send message to webhook %s: %v", webhookMsg.Key, err)
- return err
- }
- return nil
- }
- func (w *Queue) logDeadLetterMessages() error {
- ch, err := w.queueChannel.Subscribe(w.ctx, deadLetterTopic)
- if err != nil {
- return err
- }
- go func() {
- for {
- select {
- case msg, ok := <-ch:
- if !ok {
- glog.Info("dead letter channel closed")
- return
- }
- if msg == nil {
- glog.Errorf("received nil message from dead letter channel")
- continue
- }
- key := "unknown"
- if msg.Metadata != nil {
- if keyValue, exists := msg.Metadata["key"]; exists {
- key = keyValue
- }
- }
- payload := ""
- if msg.Payload != nil {
- var n filer_pb.EventNotification
- if err := proto.Unmarshal(msg.Payload, &n); err != nil {
- payload = fmt.Sprintf("failed to unmarshal payload: %v", err)
- } else {
- payload = n.String()
- }
- }
- glog.Errorf("received dead letter message: %s, key: %s", payload, key)
- case <-w.ctx.Done():
- return
- }
- }
- }()
- return nil
- }
|