main.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "log"
  6. "os"
  7. "os/signal"
  8. "sync"
  9. "syscall"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
  12. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  14. )
  15. var (
  16. agentAddr = flag.String("agent", "localhost:16777", "MQ agent address")
  17. topicNamespace = flag.String("namespace", "test", "topic namespace")
  18. topicName = flag.String("topic", "test-topic", "topic name")
  19. consumerGroup = flag.String("group", "test-consumer-group", "consumer group name")
  20. consumerGroupInstanceId = flag.String("instance", "test-consumer-1", "consumer group instance id")
  21. maxPartitions = flag.Int("max-partitions", 10, "maximum number of partitions to consume")
  22. slidingWindowSize = flag.Int("window-size", 100, "sliding window size for concurrent processing")
  23. offsetType = flag.String("offset", "latest", "offset type: earliest, latest, timestamp")
  24. offsetTsNs = flag.Int64("offset-ts", 0, "offset timestamp in nanoseconds (for timestamp offset type)")
  25. showMessages = flag.Bool("show-messages", true, "show consumed messages")
  26. logProgress = flag.Bool("log-progress", true, "log progress every 10 messages")
  27. filter = flag.String("filter", "", "message filter")
  28. )
  29. func main() {
  30. flag.Parse()
  31. fmt.Printf("Starting message consumer:\n")
  32. fmt.Printf(" Agent: %s\n", *agentAddr)
  33. fmt.Printf(" Topic: %s.%s\n", *topicNamespace, *topicName)
  34. fmt.Printf(" Consumer Group: %s\n", *consumerGroup)
  35. fmt.Printf(" Consumer Instance: %s\n", *consumerGroupInstanceId)
  36. fmt.Printf(" Max Partitions: %d\n", *maxPartitions)
  37. fmt.Printf(" Sliding Window Size: %d\n", *slidingWindowSize)
  38. fmt.Printf(" Offset Type: %s\n", *offsetType)
  39. fmt.Printf(" Filter: %s\n", *filter)
  40. // Create topic
  41. topicObj := topic.NewTopic(*topicNamespace, *topicName)
  42. // Determine offset type
  43. var pbOffsetType schema_pb.OffsetType
  44. switch *offsetType {
  45. case "earliest":
  46. pbOffsetType = schema_pb.OffsetType_RESET_TO_EARLIEST
  47. case "latest":
  48. pbOffsetType = schema_pb.OffsetType_RESET_TO_LATEST
  49. case "timestamp":
  50. pbOffsetType = schema_pb.OffsetType_EXACT_TS_NS
  51. default:
  52. pbOffsetType = schema_pb.OffsetType_RESET_TO_LATEST
  53. }
  54. // Create subscribe option
  55. option := &agent_client.SubscribeOption{
  56. ConsumerGroup: *consumerGroup,
  57. ConsumerGroupInstanceId: *consumerGroupInstanceId,
  58. Topic: topicObj,
  59. OffsetType: pbOffsetType,
  60. OffsetTsNs: *offsetTsNs,
  61. Filter: *filter,
  62. MaxSubscribedPartitions: int32(*maxPartitions),
  63. SlidingWindowSize: int32(*slidingWindowSize),
  64. }
  65. // Create subscribe session
  66. session, err := agent_client.NewSubscribeSession(*agentAddr, option)
  67. if err != nil {
  68. log.Fatalf("Failed to create subscribe session: %v", err)
  69. }
  70. defer session.CloseSession()
  71. // Statistics
  72. var messageCount int64
  73. var mu sync.Mutex
  74. startTime := time.Now()
  75. // Handle graceful shutdown
  76. sigChan := make(chan os.Signal, 1)
  77. signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
  78. // Channel to signal completion
  79. done := make(chan error, 1)
  80. // Start consuming messages
  81. fmt.Printf("\nStarting to consume messages...\n")
  82. go func() {
  83. err := session.SubscribeMessageRecord(
  84. // onEachMessageFn
  85. func(key []byte, record *schema_pb.RecordValue) {
  86. mu.Lock()
  87. messageCount++
  88. currentCount := messageCount
  89. mu.Unlock()
  90. if *showMessages {
  91. fmt.Printf("Received message: key=%s\n", string(key))
  92. printRecordValue(record)
  93. }
  94. if *logProgress && currentCount%10 == 0 {
  95. elapsed := time.Since(startTime)
  96. rate := float64(currentCount) / elapsed.Seconds()
  97. fmt.Printf("Consumed %d messages (%.2f msg/sec)\n", currentCount, rate)
  98. }
  99. },
  100. // onCompletionFn
  101. func() {
  102. fmt.Printf("Subscription completed\n")
  103. done <- nil
  104. },
  105. )
  106. if err != nil {
  107. done <- err
  108. }
  109. }()
  110. // Wait for signal or completion
  111. select {
  112. case <-sigChan:
  113. fmt.Printf("\nReceived shutdown signal, stopping consumer...\n")
  114. case err := <-done:
  115. if err != nil {
  116. log.Printf("Subscription error: %v", err)
  117. }
  118. }
  119. // Print final statistics
  120. mu.Lock()
  121. finalCount := messageCount
  122. mu.Unlock()
  123. duration := time.Since(startTime)
  124. fmt.Printf("Consumed %d messages in %v\n", finalCount, duration)
  125. if duration.Seconds() > 0 {
  126. fmt.Printf("Average throughput: %.2f messages/sec\n", float64(finalCount)/duration.Seconds())
  127. }
  128. }
  129. func printRecordValue(record *schema_pb.RecordValue) {
  130. if record == nil || record.Fields == nil {
  131. fmt.Printf(" (empty record)\n")
  132. return
  133. }
  134. for fieldName, value := range record.Fields {
  135. fmt.Printf(" %s: %s\n", fieldName, formatValue(value))
  136. }
  137. }
  138. func formatValue(value *schema_pb.Value) string {
  139. if value == nil {
  140. return "(nil)"
  141. }
  142. switch kind := value.Kind.(type) {
  143. case *schema_pb.Value_BoolValue:
  144. return fmt.Sprintf("%t", kind.BoolValue)
  145. case *schema_pb.Value_Int32Value:
  146. return fmt.Sprintf("%d", kind.Int32Value)
  147. case *schema_pb.Value_Int64Value:
  148. return fmt.Sprintf("%d", kind.Int64Value)
  149. case *schema_pb.Value_FloatValue:
  150. return fmt.Sprintf("%f", kind.FloatValue)
  151. case *schema_pb.Value_DoubleValue:
  152. return fmt.Sprintf("%f", kind.DoubleValue)
  153. case *schema_pb.Value_BytesValue:
  154. if len(kind.BytesValue) > 50 {
  155. return fmt.Sprintf("bytes[%d] %x...", len(kind.BytesValue), kind.BytesValue[:50])
  156. }
  157. return fmt.Sprintf("bytes[%d] %x", len(kind.BytesValue), kind.BytesValue)
  158. case *schema_pb.Value_StringValue:
  159. if len(kind.StringValue) > 100 {
  160. return fmt.Sprintf("\"%s...\"", kind.StringValue[:100])
  161. }
  162. return fmt.Sprintf("\"%s\"", kind.StringValue)
  163. case *schema_pb.Value_ListValue:
  164. return fmt.Sprintf("list[%d items]", len(kind.ListValue.Values))
  165. case *schema_pb.Value_RecordValue:
  166. return fmt.Sprintf("record[%d fields]", len(kind.RecordValue.Fields))
  167. default:
  168. return "(unknown)"
  169. }
  170. }