worker.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package worker
  2. import (
  3. "fmt"
  4. "log/slog"
  5. "github.com/ncarlier/webhookd/pkg/metric"
  6. "github.com/ncarlier/webhookd/pkg/notification"
  7. )
  8. // NewWorker creates, and returns a new Worker object.
  9. func NewWorker(id int, workerQueue chan chan Work) Worker {
  10. // Create, and return the worker.
  11. worker := Worker{
  12. ID: id,
  13. Work: make(chan Work),
  14. WorkerQueue: workerQueue,
  15. QuitChan: make(chan bool),
  16. }
  17. return worker
  18. }
  19. // Worker is a go routine in charge of executing a work.
  20. type Worker struct {
  21. ID int
  22. Work chan Work
  23. WorkerQueue chan chan Work
  24. QuitChan chan bool
  25. }
  26. // Start is the function to starts the worker by starting a goroutine.
  27. // That is an infinite "for-select" loop.
  28. func (w Worker) Start() {
  29. go func() {
  30. for {
  31. // Add ourselves into the worker queue.
  32. w.WorkerQueue <- w.Work
  33. select {
  34. case work := <-w.Work:
  35. // Receive a work request.
  36. slog.Debug("hook execution request received", "worker", w.ID, "hook", work.Name(), "id", work.ID())
  37. metric.Requests.Add(1)
  38. err := work.Run()
  39. if err != nil {
  40. metric.RequestsFailed.Add(1)
  41. work.SendMessage(fmt.Sprintf("error: %s", err.Error()))
  42. }
  43. // Send notification
  44. go notification.Notify(work)
  45. work.Close()
  46. case <-w.QuitChan:
  47. slog.Debug("stopping worker...", "worker", w.ID)
  48. return
  49. }
  50. }
  51. }()
  52. }
  53. // Stop tells the worker to stop listening for work requests.
  54. // Note that the worker will only stop *after* it has finished its work.
  55. func (w Worker) Stop() {
  56. go func() {
  57. w.QuitChan <- true
  58. }()
  59. }