http.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package webhook
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  13. )
  14. type httpClient struct {
  15. endpoint string
  16. token string
  17. timeout time.Duration
  18. }
  19. func newHTTPClient(cfg *config) (*httpClient, error) {
  20. return &httpClient{
  21. endpoint: cfg.endpoint,
  22. token: cfg.authBearerToken,
  23. timeout: time.Duration(cfg.timeoutSeconds) * time.Second,
  24. }, nil
  25. }
  26. func (h *httpClient) sendMessage(message *webhookMessage) error {
  27. // Serialize the protobuf message to JSON for HTTP payload
  28. notificationData, err := json.Marshal(message.Notification)
  29. if err != nil {
  30. return fmt.Errorf("failed to marshal notification: %w", err)
  31. }
  32. payload := map[string]interface{}{
  33. "key": message.Key,
  34. "event_type": message.EventType,
  35. "message": json.RawMessage(notificationData),
  36. }
  37. jsonData, err := json.Marshal(payload)
  38. if err != nil {
  39. return fmt.Errorf("failed to marshal message: %w", err)
  40. }
  41. req, err := http.NewRequest(http.MethodPost, h.endpoint, bytes.NewBuffer(jsonData))
  42. if err != nil {
  43. return fmt.Errorf("failed to create request: %w", err)
  44. }
  45. req.Header.Set("Content-Type", "application/json")
  46. if h.token != "" {
  47. req.Header.Set("Authorization", "Bearer "+h.token)
  48. }
  49. if h.timeout > 0 {
  50. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  51. defer cancel()
  52. req = req.WithContext(ctx)
  53. }
  54. resp, err := util_http.Do(req)
  55. if err != nil {
  56. if err = drainResponse(resp); err != nil {
  57. glog.Errorf("failed to drain response: %v", err)
  58. }
  59. return fmt.Errorf("failed to send request: %w", err)
  60. }
  61. defer resp.Body.Close()
  62. if resp.StatusCode < 200 || resp.StatusCode >= 300 {
  63. return fmt.Errorf("webhook returned status code: %d", resp.StatusCode)
  64. }
  65. return nil
  66. }
  67. func drainResponse(resp *http.Response) error {
  68. if resp == nil || resp.Body == nil {
  69. return nil
  70. }
  71. _, err := io.ReadAll(resp.Body)
  72. return errors.Join(
  73. err,
  74. resp.Body.Close(),
  75. )
  76. }