index.go 6.7 KB


  1. package api
  2. import (
  3. "bytes"
  4. "container/ring"
  5. "fmt"
  6. "io"
  7. "log/slog"
  8. "mime"
  9. "net/http"
  10. "path"
  11. "path/filepath"
  12. "strconv"
  13. "strings"
  14. "github.com/ncarlier/webhookd/pkg/config"
  15. "github.com/ncarlier/webhookd/pkg/helper"
  16. "github.com/ncarlier/webhookd/pkg/hook"
  17. "github.com/ncarlier/webhookd/pkg/worker"
  18. )
  19. var (
  20. defaultTimeout int
  21. defaultExt string
  22. defaultMode string
  23. scriptDir string
  24. outputDir string
  25. )
  26. const (
  27. DefaultBufferLength = 100
  28. MaxBufferLength = 10000
  29. SSEContentType = "text/event-stream"
  30. )
  31. var supportedContentTypes = []string{"text/plain", SSEContentType, "application/json", "text/*"}
  32. func atoiFallback(str string, fallback int) int {
  33. if value, err := strconv.Atoi(str); err == nil && value > 0 {
  34. return value
  35. }
  36. return fallback
  37. }
  38. // index is the main handler of the API.
  39. func index(conf *config.Config) http.Handler {
  40. defaultTimeout = conf.Hook.Timeout
  41. defaultExt = conf.Hook.DefaultExt
  42. scriptDir = conf.Hook.ScriptsDir
  43. outputDir = conf.Hook.LogDir
  44. defaultMode = conf.Hook.DefaultMode
  45. return http.HandlerFunc(webhookHandler)
  46. }
  47. func webhookHandler(w http.ResponseWriter, r *http.Request) {
  48. if r.Method == "GET" {
  49. if _, err := strconv.Atoi(filepath.Base(r.URL.Path)); err == nil {
  50. getWebhookLog(w, r)
  51. return
  52. }
  53. }
  54. triggerWebhook(w, r)
  55. }
  56. func triggerWebhook(w http.ResponseWriter, r *http.Request) {
  57. // Manage content negotiation
  58. negociatedContentType := helper.NegotiateContentType(r, supportedContentTypes, "text/plain")
  59. // Extract streaming method
  60. mode := r.Header.Get("X-Hook-Mode")
  61. if mode != "buffered" && mode != "chunked" {
  62. mode = defaultMode
  63. }
  64. if negociatedContentType == SSEContentType {
  65. mode = "sse"
  66. }
  67. // Check that streaming is supported
  68. if _, ok := w.(http.Flusher); !ok && mode != "buffered" {
  69. http.Error(w, "streaming not supported", http.StatusInternalServerError)
  70. return
  71. }
  72. // Get hook location
  73. hookName := strings.TrimPrefix(r.URL.Path, "/")
  74. if hookName == "" {
  75. infoHandler(w, r)
  76. return
  77. }
  78. script, err := hook.ResolveScript(scriptDir, hookName, defaultExt)
  79. if err != nil {
  80. msg := "hook not found"
  81. slog.Error(msg, "err", err.Error())
  82. http.Error(w, msg, http.StatusNotFound)
  83. return
  84. }
  85. if err = r.ParseForm(); err != nil {
  86. msg := "unable to parse form-data"
  87. slog.Error(msg, "err", err)
  88. http.Error(w, msg, http.StatusBadRequest)
  89. return
  90. }
  91. // parse body
  92. var body []byte
  93. ct := r.Header.Get("Content-Type")
  94. if ct != "" {
  95. mediatype, _, _ := mime.ParseMediaType(ct)
  96. switch {
  97. case mediatype == "application/json", strings.HasPrefix(mediatype, "text/"):
  98. body, err = io.ReadAll(r.Body)
  99. if err != nil {
  100. msg := "unable to read request body"
  101. slog.Error(msg, "err", err)
  102. http.Error(w, msg, http.StatusBadRequest)
  103. return
  104. }
  105. case mediatype == "multipart/form-data":
  106. if err := r.ParseMultipartForm(8 << 20); err != nil {
  107. msg := "unable to parse multipart/form-data"
  108. slog.Error(msg, "err", err)
  109. http.Error(w, msg, http.StatusBadRequest)
  110. return
  111. }
  112. default:
  113. slog.Debug("unsuported media type", "media_type", mediatype)
  114. }
  115. }
  116. params := HTTPParamsToShellVars(r.Form)
  117. params = append(params, HTTPParamsToShellVars(r.Header)...)
  118. // Create hook job
  119. timeout := atoiFallback(r.Header.Get("X-Hook-Timeout"), defaultTimeout)
  120. job, err := hook.NewHookJob(&hook.Request{
  121. Name: hookName,
  122. Script: script,
  123. Method: r.Method,
  124. Payload: string(body),
  125. Args: params,
  126. Timeout: timeout,
  127. OutputDir: outputDir,
  128. })
  129. if err != nil {
  130. msg := "unable to create hook execution job"
  131. slog.Error(msg, "err", err)
  132. http.Error(w, msg, http.StatusInternalServerError)
  133. return
  134. }
  135. // Put work in queue
  136. worker.WorkQueue <- job
  137. // Write hook ouput to the response regarding the asked method
  138. if mode != "buffered" {
  139. // Write hook response as Server Sent Event stream
  140. writeStreamedResponse(w, negociatedContentType, job, mode)
  141. } else {
  142. maxBufferLength := atoiFallback(r.Header.Get("X-Hook-MaxBufferedLines"), DefaultBufferLength)
  143. if maxBufferLength > MaxBufferLength {
  144. maxBufferLength = MaxBufferLength
  145. }
  146. // Write hook response after hook execution
  147. writeStandardResponse(w, negociatedContentType, job, maxBufferLength)
  148. }
  149. }
  150. func writeStreamedResponse(w http.ResponseWriter, negociatedContentType string, job *hook.Job, mode string) {
  151. writeHeaders(w, negociatedContentType, job.ID())
  152. for {
  153. msg, open := <-job.MessageChan
  154. if !open {
  155. break
  156. }
  157. if mode == "sse" {
  158. // Send SSE response
  159. prefix := "data: "
  160. if bytes.HasPrefix(msg, []byte("error:")) {
  161. prefix = ""
  162. }
  163. fmt.Fprintf(w, "%s%s\n", prefix, msg)
  164. } else {
  165. // Send chunked response
  166. w.Write(msg)
  167. }
  168. // Flush the data immediately instead of buffering it for later.
  169. if flusher, ok := w.(http.Flusher); ok {
  170. flusher.Flush()
  171. }
  172. }
  173. }
  174. func writeStandardResponse(w http.ResponseWriter, negociatedContentType string, job *hook.Job, maxBufferLength int) {
  175. buffer := ring.New(maxBufferLength)
  176. overflow := false
  177. lines := 0
  178. // Consume messages into a ring buffer
  179. for {
  180. msg, open := <-job.MessageChan
  181. if !open {
  182. break
  183. }
  184. buffer.Value = msg
  185. buffer = buffer.Next()
  186. lines++
  187. if lines > maxBufferLength {
  188. overflow = true
  189. }
  190. }
  191. writeHeaders(w, negociatedContentType, job.ID())
  192. w.WriteHeader(getJobStatusCode(job))
  193. if overflow {
  194. w.Write([]byte("[output truncated]\n"))
  195. }
  196. // Write buffer to HTTP response
  197. buffer.Do(func(data interface{}) {
  198. if data != nil {
  199. w.Write(data.([]byte))
  200. }
  201. })
  202. }
  203. func getJobStatusCode(job *hook.Job) int {
  204. switch {
  205. case job.ExitCode() == 0:
  206. return http.StatusOK
  207. case job.ExitCode() >= 100:
  208. return job.ExitCode() + 300
  209. default:
  210. return http.StatusInternalServerError
  211. }
  212. }
  213. func writeHeaders(w http.ResponseWriter, contentType string, hookId uint64) {
  214. w.Header().Set("Content-Type", contentType+"; charset=utf-8")
  215. w.Header().Set("Cache-Control", "no-cache")
  216. w.Header().Set("Connection", "keep-alive")
  217. w.Header().Set("X-Content-Type-Options", "nosniff")
  218. w.Header().Set("X-Hook-ID", strconv.FormatUint(hookId, 10))
  219. }
  220. func getWebhookLog(w http.ResponseWriter, r *http.Request) {
  221. // Get hook ID
  222. id := path.Base(r.URL.Path)
  223. // Get script location
  224. hookName := path.Dir(strings.TrimPrefix(r.URL.Path, "/"))
  225. _, err := hook.ResolveScript(scriptDir, hookName, defaultExt)
  226. if err != nil {
  227. slog.Error(err.Error())
  228. http.Error(w, err.Error(), http.StatusNotFound)
  229. return
  230. }
  231. // Retrieve log file
  232. logFile, err := hook.GetLogFile(id, hookName, outputDir)
  233. if err != nil {
  234. slog.Error(err.Error())
  235. http.Error(w, err.Error(), http.StatusInternalServerError)
  236. return
  237. }
  238. if logFile == nil {
  239. http.Error(w, "hook execution log not found", http.StatusNotFound)
  240. return
  241. }
  242. defer logFile.Close()
  243. w.Header().Set("Content-Type", "text/plain")
  244. io.Copy(w, logFile)
  245. }