main.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "log"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/client/agent_client"
  8. "github.com/seaweedfs/seaweedfs/weed/mq/schema"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  10. )
  11. var (
  12. agentAddr = flag.String("agent", "localhost:16777", "MQ agent address")
  13. topicNamespace = flag.String("namespace", "test", "topic namespace")
  14. topicName = flag.String("topic", "test-topic", "topic name")
  15. partitionCount = flag.Int("partitions", 4, "number of partitions")
  16. messageCount = flag.Int("messages", 100, "number of messages to produce")
  17. publisherName = flag.String("publisher", "test-producer", "publisher name")
  18. messageSize = flag.Int("size", 1024, "message size in bytes")
  19. interval = flag.Duration("interval", 100*time.Millisecond, "interval between messages")
  20. )
  21. // TestMessage represents the structure of messages we'll be sending
  22. type TestMessage struct {
  23. ID int64 `json:"id"`
  24. Message string `json:"message"`
  25. Payload []byte `json:"payload"`
  26. Timestamp int64 `json:"timestamp"`
  27. }
  28. func main() {
  29. flag.Parse()
  30. fmt.Printf("Starting message producer:\n")
  31. fmt.Printf(" Agent: %s\n", *agentAddr)
  32. fmt.Printf(" Topic: %s.%s\n", *topicNamespace, *topicName)
  33. fmt.Printf(" Partitions: %d\n", *partitionCount)
  34. fmt.Printf(" Messages: %d\n", *messageCount)
  35. fmt.Printf(" Publisher: %s\n", *publisherName)
  36. fmt.Printf(" Message Size: %d bytes\n", *messageSize)
  37. fmt.Printf(" Interval: %v\n", *interval)
  38. // Create an instance of the message struct to generate schema from
  39. messageInstance := TestMessage{}
  40. // Automatically generate RecordType from the struct
  41. recordType := schema.StructToSchema(messageInstance)
  42. if recordType == nil {
  43. log.Fatalf("Failed to generate schema from struct")
  44. }
  45. fmt.Printf("\nGenerated schema with %d fields:\n", len(recordType.Fields))
  46. for _, field := range recordType.Fields {
  47. fmt.Printf(" - %s: %s\n", field.Name, getTypeString(field.Type))
  48. }
  49. topicSchema := schema.NewSchema(*topicNamespace, *topicName, recordType)
  50. // Create publish session
  51. session, err := agent_client.NewPublishSession(*agentAddr, topicSchema, *partitionCount, *publisherName)
  52. if err != nil {
  53. log.Fatalf("Failed to create publish session: %v", err)
  54. }
  55. defer session.CloseSession()
  56. // Create message payload
  57. payload := make([]byte, *messageSize)
  58. for i := range payload {
  59. payload[i] = byte(i % 256)
  60. }
  61. // Start producing messages
  62. fmt.Printf("\nStarting to produce messages...\n")
  63. startTime := time.Now()
  64. for i := 0; i < *messageCount; i++ {
  65. key := fmt.Sprintf("key-%d", i)
  66. // Create a message struct
  67. message := TestMessage{
  68. ID: int64(i),
  69. Message: fmt.Sprintf("This is message number %d", i),
  70. Payload: payload[:min(100, len(payload))], // First 100 bytes
  71. Timestamp: time.Now().UnixNano(),
  72. }
  73. // Convert struct to RecordValue
  74. record := structToRecordValue(message)
  75. err := session.PublishMessageRecord([]byte(key), record)
  76. if err != nil {
  77. log.Printf("Failed to publish message %d: %v", i, err)
  78. continue
  79. }
  80. if (i+1)%10 == 0 {
  81. fmt.Printf("Published %d messages\n", i+1)
  82. }
  83. if *interval > 0 {
  84. time.Sleep(*interval)
  85. }
  86. }
  87. duration := time.Since(startTime)
  88. fmt.Printf("\nCompleted producing %d messages in %v\n", *messageCount, duration)
  89. fmt.Printf("Throughput: %.2f messages/sec\n", float64(*messageCount)/duration.Seconds())
  90. }
  91. // Helper function to convert struct to RecordValue
  92. func structToRecordValue(msg TestMessage) *schema_pb.RecordValue {
  93. return &schema_pb.RecordValue{
  94. Fields: map[string]*schema_pb.Value{
  95. "ID": {
  96. Kind: &schema_pb.Value_Int64Value{
  97. Int64Value: msg.ID,
  98. },
  99. },
  100. "Message": {
  101. Kind: &schema_pb.Value_StringValue{
  102. StringValue: msg.Message,
  103. },
  104. },
  105. "Payload": {
  106. Kind: &schema_pb.Value_BytesValue{
  107. BytesValue: msg.Payload,
  108. },
  109. },
  110. "Timestamp": {
  111. Kind: &schema_pb.Value_Int64Value{
  112. Int64Value: msg.Timestamp,
  113. },
  114. },
  115. },
  116. }
  117. }
  118. func getTypeString(t *schema_pb.Type) string {
  119. switch kind := t.Kind.(type) {
  120. case *schema_pb.Type_ScalarType:
  121. switch kind.ScalarType {
  122. case schema_pb.ScalarType_BOOL:
  123. return "bool"
  124. case schema_pb.ScalarType_INT32:
  125. return "int32"
  126. case schema_pb.ScalarType_INT64:
  127. return "int64"
  128. case schema_pb.ScalarType_FLOAT:
  129. return "float"
  130. case schema_pb.ScalarType_DOUBLE:
  131. return "double"
  132. case schema_pb.ScalarType_BYTES:
  133. return "bytes"
  134. case schema_pb.ScalarType_STRING:
  135. return "string"
  136. }
  137. case *schema_pb.Type_ListType:
  138. return fmt.Sprintf("list<%s>", getTypeString(kind.ListType.ElementType))
  139. case *schema_pb.Type_RecordType:
  140. return "record"
  141. }
  142. return "unknown"
  143. }
  144. func min(a, b int) int {
  145. if a < b {
  146. return a
  147. }
  148. return b
  149. }