mq_handlers.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package handlers
  2. import (
  3. "fmt"
  4. "net/http"
  5. "github.com/gin-gonic/gin"
  6. "github.com/seaweedfs/seaweedfs/weed/admin/dash"
  7. "github.com/seaweedfs/seaweedfs/weed/admin/view/app"
  8. "github.com/seaweedfs/seaweedfs/weed/admin/view/layout"
  9. )
  10. // MessageQueueHandlers contains all the HTTP handlers for message queue management
  11. type MessageQueueHandlers struct {
  12. adminServer *dash.AdminServer
  13. }
  14. // NewMessageQueueHandlers creates a new instance of MessageQueueHandlers
  15. func NewMessageQueueHandlers(adminServer *dash.AdminServer) *MessageQueueHandlers {
  16. return &MessageQueueHandlers{
  17. adminServer: adminServer,
  18. }
  19. }
  20. // ShowBrokers renders the message queue brokers page
  21. func (h *MessageQueueHandlers) ShowBrokers(c *gin.Context) {
  22. // Get cluster brokers data
  23. brokersData, err := h.adminServer.GetClusterBrokers()
  24. if err != nil {
  25. c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get cluster brokers: " + err.Error()})
  26. return
  27. }
  28. // Set username
  29. username := c.GetString("username")
  30. if username == "" {
  31. username = "admin"
  32. }
  33. brokersData.Username = username
  34. // Render HTML template
  35. c.Header("Content-Type", "text/html")
  36. brokersComponent := app.ClusterBrokers(*brokersData)
  37. layoutComponent := layout.Layout(c, brokersComponent)
  38. err = layoutComponent.Render(c.Request.Context(), c.Writer)
  39. if err != nil {
  40. c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to render template: " + err.Error()})
  41. return
  42. }
  43. }
  44. // ShowTopics renders the message queue topics page
  45. func (h *MessageQueueHandlers) ShowTopics(c *gin.Context) {
  46. // Get topics data
  47. topicsData, err := h.adminServer.GetTopics()
  48. if err != nil {
  49. c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get topics: " + err.Error()})
  50. return
  51. }
  52. // Set username
  53. username := c.GetString("username")
  54. if username == "" {
  55. username = "admin"
  56. }
  57. topicsData.Username = username
  58. // Render HTML template
  59. c.Header("Content-Type", "text/html")
  60. topicsComponent := app.Topics(*topicsData)
  61. layoutComponent := layout.Layout(c, topicsComponent)
  62. err = layoutComponent.Render(c.Request.Context(), c.Writer)
  63. if err != nil {
  64. c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to render template: " + err.Error()})
  65. return
  66. }
  67. }
  68. // ShowSubscribers renders the message queue subscribers page
  69. func (h *MessageQueueHandlers) ShowSubscribers(c *gin.Context) {
  70. // Get subscribers data
  71. subscribersData, err := h.adminServer.GetSubscribers()
  72. if err != nil {
  73. c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get subscribers: " + err.Error()})
  74. return
  75. }
  76. // Set username
  77. username := c.GetString("username")
  78. if username == "" {
  79. username = "admin"
  80. }
  81. subscribersData.Username = username
  82. // Render HTML template
  83. c.Header("Content-Type", "text/html")
  84. subscribersComponent := app.Subscribers(*subscribersData)
  85. layoutComponent := layout.Layout(c, subscribersComponent)
  86. err = layoutComponent.Render(c.Request.Context(), c.Writer)
  87. if err != nil {
  88. c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to render template: " + err.Error()})
  89. return
  90. }
  91. }
  92. // ShowTopicDetails renders the topic details page
  93. func (h *MessageQueueHandlers) ShowTopicDetails(c *gin.Context) {
  94. // Get topic parameters from URL
  95. namespace := c.Param("namespace")
  96. topicName := c.Param("topic")
  97. if namespace == "" || topicName == "" {
  98. c.JSON(http.StatusBadRequest, gin.H{"error": "Missing namespace or topic name"})
  99. return
  100. }
  101. // Get topic details data
  102. topicDetailsData, err := h.adminServer.GetTopicDetails(namespace, topicName)
  103. if err != nil {
  104. c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get topic details: " + err.Error()})
  105. return
  106. }
  107. // Set username
  108. username := c.GetString("username")
  109. if username == "" {
  110. username = "admin"
  111. }
  112. topicDetailsData.Username = username
  113. // Render HTML template
  114. c.Header("Content-Type", "text/html")
  115. topicDetailsComponent := app.TopicDetails(*topicDetailsData)
  116. layoutComponent := layout.Layout(c, topicDetailsComponent)
  117. err = layoutComponent.Render(c.Request.Context(), c.Writer)
  118. if err != nil {
  119. c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to render template: " + err.Error()})
  120. return
  121. }
  122. }
  123. // GetTopicDetailsAPI returns topic details as JSON for AJAX calls
  124. func (h *MessageQueueHandlers) GetTopicDetailsAPI(c *gin.Context) {
  125. // Get topic parameters from URL
  126. namespace := c.Param("namespace")
  127. topicName := c.Param("topic")
  128. if namespace == "" || topicName == "" {
  129. c.JSON(http.StatusBadRequest, gin.H{"error": "Missing namespace or topic name"})
  130. return
  131. }
  132. // Get topic details data
  133. topicDetailsData, err := h.adminServer.GetTopicDetails(namespace, topicName)
  134. if err != nil {
  135. c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get topic details: " + err.Error()})
  136. return
  137. }
  138. // Return JSON data
  139. c.JSON(http.StatusOK, topicDetailsData)
  140. }
  141. // CreateTopicAPI creates a new topic with retention configuration
  142. func (h *MessageQueueHandlers) CreateTopicAPI(c *gin.Context) {
  143. var req struct {
  144. Namespace string `json:"namespace" binding:"required"`
  145. Name string `json:"name" binding:"required"`
  146. PartitionCount int32 `json:"partition_count" binding:"required"`
  147. Retention struct {
  148. Enabled bool `json:"enabled"`
  149. RetentionSeconds int64 `json:"retention_seconds"`
  150. } `json:"retention"`
  151. }
  152. if err := c.ShouldBindJSON(&req); err != nil {
  153. c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request: " + err.Error()})
  154. return
  155. }
  156. // Validate inputs
  157. if req.PartitionCount < 1 || req.PartitionCount > 100 {
  158. c.JSON(http.StatusBadRequest, gin.H{"error": "Partition count must be between 1 and 100"})
  159. return
  160. }
  161. if req.Retention.Enabled && req.Retention.RetentionSeconds <= 0 {
  162. c.JSON(http.StatusBadRequest, gin.H{"error": "Retention seconds must be positive when retention is enabled"})
  163. return
  164. }
  165. // Create the topic via admin server
  166. err := h.adminServer.CreateTopicWithRetention(req.Namespace, req.Name, req.PartitionCount, req.Retention.Enabled, req.Retention.RetentionSeconds)
  167. if err != nil {
  168. c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create topic: " + err.Error()})
  169. return
  170. }
  171. c.JSON(http.StatusOK, gin.H{
  172. "message": "Topic created successfully",
  173. "topic": fmt.Sprintf("%s.%s", req.Namespace, req.Name),
  174. })
  175. }
  176. type UpdateTopicRetentionRequest struct {
  177. Namespace string `json:"namespace"`
  178. Name string `json:"name"`
  179. Retention struct {
  180. Enabled bool `json:"enabled"`
  181. RetentionSeconds int64 `json:"retention_seconds"`
  182. } `json:"retention"`
  183. }
  184. func (h *MessageQueueHandlers) UpdateTopicRetentionAPI(c *gin.Context) {
  185. var request UpdateTopicRetentionRequest
  186. if err := c.ShouldBindJSON(&request); err != nil {
  187. c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
  188. return
  189. }
  190. // Validate required fields
  191. if request.Namespace == "" || request.Name == "" {
  192. c.JSON(http.StatusBadRequest, gin.H{"error": "namespace and name are required"})
  193. return
  194. }
  195. // Update the topic retention
  196. err := h.adminServer.UpdateTopicRetention(request.Namespace, request.Name, request.Retention.Enabled, request.Retention.RetentionSeconds)
  197. if err != nil {
  198. c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
  199. return
  200. }
  201. c.JSON(http.StatusOK, gin.H{
  202. "message": "Topic retention updated successfully",
  203. "topic": request.Namespace + "." + request.Name,
  204. })
  205. }