| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- package handlers
- import (
- "fmt"
- "net/http"
- "github.com/gin-gonic/gin"
- "github.com/seaweedfs/seaweedfs/weed/admin/dash"
- "github.com/seaweedfs/seaweedfs/weed/admin/view/app"
- "github.com/seaweedfs/seaweedfs/weed/admin/view/layout"
- )
- // MessageQueueHandlers contains all the HTTP handlers for message queue management
- type MessageQueueHandlers struct {
- adminServer *dash.AdminServer
- }
- // NewMessageQueueHandlers creates a new instance of MessageQueueHandlers
- func NewMessageQueueHandlers(adminServer *dash.AdminServer) *MessageQueueHandlers {
- return &MessageQueueHandlers{
- adminServer: adminServer,
- }
- }
- // ShowBrokers renders the message queue brokers page
- func (h *MessageQueueHandlers) ShowBrokers(c *gin.Context) {
- // Get cluster brokers data
- brokersData, err := h.adminServer.GetClusterBrokers()
- if err != nil {
- c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get cluster brokers: " + err.Error()})
- return
- }
- // Set username
- username := c.GetString("username")
- if username == "" {
- username = "admin"
- }
- brokersData.Username = username
- // Render HTML template
- c.Header("Content-Type", "text/html")
- brokersComponent := app.ClusterBrokers(*brokersData)
- layoutComponent := layout.Layout(c, brokersComponent)
- err = layoutComponent.Render(c.Request.Context(), c.Writer)
- if err != nil {
- c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to render template: " + err.Error()})
- return
- }
- }
- // ShowTopics renders the message queue topics page
- func (h *MessageQueueHandlers) ShowTopics(c *gin.Context) {
- // Get topics data
- topicsData, err := h.adminServer.GetTopics()
- if err != nil {
- c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get topics: " + err.Error()})
- return
- }
- // Set username
- username := c.GetString("username")
- if username == "" {
- username = "admin"
- }
- topicsData.Username = username
- // Render HTML template
- c.Header("Content-Type", "text/html")
- topicsComponent := app.Topics(*topicsData)
- layoutComponent := layout.Layout(c, topicsComponent)
- err = layoutComponent.Render(c.Request.Context(), c.Writer)
- if err != nil {
- c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to render template: " + err.Error()})
- return
- }
- }
- // ShowSubscribers renders the message queue subscribers page
- func (h *MessageQueueHandlers) ShowSubscribers(c *gin.Context) {
- // Get subscribers data
- subscribersData, err := h.adminServer.GetSubscribers()
- if err != nil {
- c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get subscribers: " + err.Error()})
- return
- }
- // Set username
- username := c.GetString("username")
- if username == "" {
- username = "admin"
- }
- subscribersData.Username = username
- // Render HTML template
- c.Header("Content-Type", "text/html")
- subscribersComponent := app.Subscribers(*subscribersData)
- layoutComponent := layout.Layout(c, subscribersComponent)
- err = layoutComponent.Render(c.Request.Context(), c.Writer)
- if err != nil {
- c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to render template: " + err.Error()})
- return
- }
- }
- // ShowTopicDetails renders the topic details page
- func (h *MessageQueueHandlers) ShowTopicDetails(c *gin.Context) {
- // Get topic parameters from URL
- namespace := c.Param("namespace")
- topicName := c.Param("topic")
- if namespace == "" || topicName == "" {
- c.JSON(http.StatusBadRequest, gin.H{"error": "Missing namespace or topic name"})
- return
- }
- // Get topic details data
- topicDetailsData, err := h.adminServer.GetTopicDetails(namespace, topicName)
- if err != nil {
- c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get topic details: " + err.Error()})
- return
- }
- // Set username
- username := c.GetString("username")
- if username == "" {
- username = "admin"
- }
- topicDetailsData.Username = username
- // Render HTML template
- c.Header("Content-Type", "text/html")
- topicDetailsComponent := app.TopicDetails(*topicDetailsData)
- layoutComponent := layout.Layout(c, topicDetailsComponent)
- err = layoutComponent.Render(c.Request.Context(), c.Writer)
- if err != nil {
- c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to render template: " + err.Error()})
- return
- }
- }
- // GetTopicDetailsAPI returns topic details as JSON for AJAX calls
- func (h *MessageQueueHandlers) GetTopicDetailsAPI(c *gin.Context) {
- // Get topic parameters from URL
- namespace := c.Param("namespace")
- topicName := c.Param("topic")
- if namespace == "" || topicName == "" {
- c.JSON(http.StatusBadRequest, gin.H{"error": "Missing namespace or topic name"})
- return
- }
- // Get topic details data
- topicDetailsData, err := h.adminServer.GetTopicDetails(namespace, topicName)
- if err != nil {
- c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to get topic details: " + err.Error()})
- return
- }
- // Return JSON data
- c.JSON(http.StatusOK, topicDetailsData)
- }
- // CreateTopicAPI creates a new topic with retention configuration
- func (h *MessageQueueHandlers) CreateTopicAPI(c *gin.Context) {
- var req struct {
- Namespace string `json:"namespace" binding:"required"`
- Name string `json:"name" binding:"required"`
- PartitionCount int32 `json:"partition_count" binding:"required"`
- Retention struct {
- Enabled bool `json:"enabled"`
- RetentionSeconds int64 `json:"retention_seconds"`
- } `json:"retention"`
- }
- if err := c.ShouldBindJSON(&req); err != nil {
- c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request: " + err.Error()})
- return
- }
- // Validate inputs
- if req.PartitionCount < 1 || req.PartitionCount > 100 {
- c.JSON(http.StatusBadRequest, gin.H{"error": "Partition count must be between 1 and 100"})
- return
- }
- if req.Retention.Enabled && req.Retention.RetentionSeconds <= 0 {
- c.JSON(http.StatusBadRequest, gin.H{"error": "Retention seconds must be positive when retention is enabled"})
- return
- }
- // Create the topic via admin server
- err := h.adminServer.CreateTopicWithRetention(req.Namespace, req.Name, req.PartitionCount, req.Retention.Enabled, req.Retention.RetentionSeconds)
- if err != nil {
- c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create topic: " + err.Error()})
- return
- }
- c.JSON(http.StatusOK, gin.H{
- "message": "Topic created successfully",
- "topic": fmt.Sprintf("%s.%s", req.Namespace, req.Name),
- })
- }
- type UpdateTopicRetentionRequest struct {
- Namespace string `json:"namespace"`
- Name string `json:"name"`
- Retention struct {
- Enabled bool `json:"enabled"`
- RetentionSeconds int64 `json:"retention_seconds"`
- } `json:"retention"`
- }
- func (h *MessageQueueHandlers) UpdateTopicRetentionAPI(c *gin.Context) {
- var request UpdateTopicRetentionRequest
- if err := c.ShouldBindJSON(&request); err != nil {
- c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
- return
- }
- // Validate required fields
- if request.Namespace == "" || request.Name == "" {
- c.JSON(http.StatusBadRequest, gin.H{"error": "namespace and name are required"})
- return
- }
- // Update the topic retention
- err := h.adminServer.UpdateTopicRetention(request.Namespace, request.Name, request.Retention.Enabled, request.Retention.RetentionSeconds)
- if err != nil {
- c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
- return
- }
- c.JSON(http.StatusOK, gin.H{
- "message": "Topic retention updated successfully",
- "topic": request.Namespace + "." + request.Name,
- })
- }
|