| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- package main
- import (
- "flag"
- "fmt"
- "log"
- "os"
- "os/signal"
- "sync"
- "syscall"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- )
- var (
- agentAddr = flag.String("agent", "localhost:16777", "MQ agent address")
- topicNamespace = flag.String("namespace", "test", "topic namespace")
- topicName = flag.String("topic", "test-topic", "topic name")
- consumerGroup = flag.String("group", "test-consumer-group", "consumer group name")
- consumerGroupInstanceId = flag.String("instance", "test-consumer-1", "consumer group instance id")
- maxPartitions = flag.Int("max-partitions", 10, "maximum number of partitions to consume")
- slidingWindowSize = flag.Int("window-size", 100, "sliding window size for concurrent processing")
- offsetType = flag.String("offset", "latest", "offset type: earliest, latest, timestamp")
- offsetTsNs = flag.Int64("offset-ts", 0, "offset timestamp in nanoseconds (for timestamp offset type)")
- showMessages = flag.Bool("show-messages", true, "show consumed messages")
- logProgress = flag.Bool("log-progress", true, "log progress every 10 messages")
- filter = flag.String("filter", "", "message filter")
- )
- func main() {
- flag.Parse()
- fmt.Printf("Starting message consumer:\n")
- fmt.Printf(" Agent: %s\n", *agentAddr)
- fmt.Printf(" Topic: %s.%s\n", *topicNamespace, *topicName)
- fmt.Printf(" Consumer Group: %s\n", *consumerGroup)
- fmt.Printf(" Consumer Instance: %s\n", *consumerGroupInstanceId)
- fmt.Printf(" Max Partitions: %d\n", *maxPartitions)
- fmt.Printf(" Sliding Window Size: %d\n", *slidingWindowSize)
- fmt.Printf(" Offset Type: %s\n", *offsetType)
- fmt.Printf(" Filter: %s\n", *filter)
- // Create topic
- topicObj := topic.NewTopic(*topicNamespace, *topicName)
- // Determine offset type
- var pbOffsetType schema_pb.OffsetType
- switch *offsetType {
- case "earliest":
- pbOffsetType = schema_pb.OffsetType_RESET_TO_EARLIEST
- case "latest":
- pbOffsetType = schema_pb.OffsetType_RESET_TO_LATEST
- case "timestamp":
- pbOffsetType = schema_pb.OffsetType_EXACT_TS_NS
- default:
- pbOffsetType = schema_pb.OffsetType_RESET_TO_LATEST
- }
- // Create subscribe option
- option := &agent_client.SubscribeOption{
- ConsumerGroup: *consumerGroup,
- ConsumerGroupInstanceId: *consumerGroupInstanceId,
- Topic: topicObj,
- OffsetType: pbOffsetType,
- OffsetTsNs: *offsetTsNs,
- Filter: *filter,
- MaxSubscribedPartitions: int32(*maxPartitions),
- SlidingWindowSize: int32(*slidingWindowSize),
- }
- // Create subscribe session
- session, err := agent_client.NewSubscribeSession(*agentAddr, option)
- if err != nil {
- log.Fatalf("Failed to create subscribe session: %v", err)
- }
- defer session.CloseSession()
- // Statistics
- var messageCount int64
- var mu sync.Mutex
- startTime := time.Now()
- // Handle graceful shutdown
- sigChan := make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
- // Channel to signal completion
- done := make(chan error, 1)
- // Start consuming messages
- fmt.Printf("\nStarting to consume messages...\n")
- go func() {
- err := session.SubscribeMessageRecord(
- // onEachMessageFn
- func(key []byte, record *schema_pb.RecordValue) {
- mu.Lock()
- messageCount++
- currentCount := messageCount
- mu.Unlock()
- if *showMessages {
- fmt.Printf("Received message: key=%s\n", string(key))
- printRecordValue(record)
- }
- if *logProgress && currentCount%10 == 0 {
- elapsed := time.Since(startTime)
- rate := float64(currentCount) / elapsed.Seconds()
- fmt.Printf("Consumed %d messages (%.2f msg/sec)\n", currentCount, rate)
- }
- },
- // onCompletionFn
- func() {
- fmt.Printf("Subscription completed\n")
- done <- nil
- },
- )
- if err != nil {
- done <- err
- }
- }()
- // Wait for signal or completion
- select {
- case <-sigChan:
- fmt.Printf("\nReceived shutdown signal, stopping consumer...\n")
- case err := <-done:
- if err != nil {
- log.Printf("Subscription error: %v", err)
- }
- }
- // Print final statistics
- mu.Lock()
- finalCount := messageCount
- mu.Unlock()
- duration := time.Since(startTime)
- fmt.Printf("Consumed %d messages in %v\n", finalCount, duration)
- if duration.Seconds() > 0 {
- fmt.Printf("Average throughput: %.2f messages/sec\n", float64(finalCount)/duration.Seconds())
- }
- }
- func printRecordValue(record *schema_pb.RecordValue) {
- if record == nil || record.Fields == nil {
- fmt.Printf(" (empty record)\n")
- return
- }
- for fieldName, value := range record.Fields {
- fmt.Printf(" %s: %s\n", fieldName, formatValue(value))
- }
- }
- func formatValue(value *schema_pb.Value) string {
- if value == nil {
- return "(nil)"
- }
- switch kind := value.Kind.(type) {
- case *schema_pb.Value_BoolValue:
- return fmt.Sprintf("%t", kind.BoolValue)
- case *schema_pb.Value_Int32Value:
- return fmt.Sprintf("%d", kind.Int32Value)
- case *schema_pb.Value_Int64Value:
- return fmt.Sprintf("%d", kind.Int64Value)
- case *schema_pb.Value_FloatValue:
- return fmt.Sprintf("%f", kind.FloatValue)
- case *schema_pb.Value_DoubleValue:
- return fmt.Sprintf("%f", kind.DoubleValue)
- case *schema_pb.Value_BytesValue:
- if len(kind.BytesValue) > 50 {
- return fmt.Sprintf("bytes[%d] %x...", len(kind.BytesValue), kind.BytesValue[:50])
- }
- return fmt.Sprintf("bytes[%d] %x", len(kind.BytesValue), kind.BytesValue)
- case *schema_pb.Value_StringValue:
- if len(kind.StringValue) > 100 {
- return fmt.Sprintf("\"%s...\"", kind.StringValue[:100])
- }
- return fmt.Sprintf("\"%s\"", kind.StringValue)
- case *schema_pb.Value_ListValue:
- return fmt.Sprintf("list[%d items]", len(kind.ListValue.Values))
- case *schema_pb.Value_RecordValue:
- return fmt.Sprintf("record[%d fields]", len(kind.RecordValue.Fields))
- default:
- return "(unknown)"
- }
- }
|