producer.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "math/big"
  8. "math/rand"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/seaweedfs/seaweedfs/weed/cluster"
  14. "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
  15. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  16. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  19. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  20. "google.golang.org/grpc"
  21. "google.golang.org/grpc/credentials/insecure"
  22. )
  23. type UserEvent struct {
  24. ID int64 `json:"id"`
  25. UserID int64 `json:"user_id"`
  26. UserType string `json:"user_type"`
  27. Action string `json:"action"`
  28. Status string `json:"status"`
  29. Amount float64 `json:"amount,omitempty"`
  30. PreciseAmount string `json:"precise_amount,omitempty"` // Will be converted to DECIMAL
  31. BirthDate time.Time `json:"birth_date"` // Will be converted to DATE
  32. Timestamp time.Time `json:"timestamp"`
  33. Metadata string `json:"metadata,omitempty"`
  34. }
  35. type SystemLog struct {
  36. ID int64 `json:"id"`
  37. Level string `json:"level"`
  38. Service string `json:"service"`
  39. Message string `json:"message"`
  40. ErrorCode int `json:"error_code,omitempty"`
  41. Timestamp time.Time `json:"timestamp"`
  42. }
  43. type MetricEntry struct {
  44. ID int64 `json:"id"`
  45. Name string `json:"name"`
  46. Value float64 `json:"value"`
  47. Tags string `json:"tags"`
  48. Timestamp time.Time `json:"timestamp"`
  49. }
  50. type ProductView struct {
  51. ID int64 `json:"id"`
  52. ProductID int64 `json:"product_id"`
  53. UserID int64 `json:"user_id"`
  54. Category string `json:"category"`
  55. Price float64 `json:"price"`
  56. ViewCount int `json:"view_count"`
  57. Timestamp time.Time `json:"timestamp"`
  58. }
  59. func main() {
  60. // Get SeaweedFS configuration from environment
  61. masterAddr := getEnv("SEAWEEDFS_MASTER", "localhost:9333")
  62. filerAddr := getEnv("SEAWEEDFS_FILER", "localhost:8888")
  63. log.Printf("Creating MQ test data...")
  64. log.Printf("Master: %s", masterAddr)
  65. log.Printf("Filer: %s", filerAddr)
  66. // Wait for SeaweedFS to be ready
  67. log.Println("Waiting for SeaweedFS to be ready...")
  68. time.Sleep(10 * time.Second)
  69. // Create topics and populate with data
  70. topics := []struct {
  71. namespace string
  72. topic string
  73. generator func() interface{}
  74. count int
  75. }{
  76. {"analytics", "user_events", generateUserEvent, 1000},
  77. {"analytics", "system_logs", generateSystemLog, 500},
  78. {"analytics", "metrics", generateMetric, 800},
  79. {"ecommerce", "product_views", generateProductView, 1200},
  80. {"ecommerce", "user_events", generateUserEvent, 600},
  81. {"logs", "application_logs", generateSystemLog, 2000},
  82. {"logs", "error_logs", generateErrorLog, 300},
  83. }
  84. for _, topicConfig := range topics {
  85. log.Printf("Creating topic %s.%s with %d records...",
  86. topicConfig.namespace, topicConfig.topic, topicConfig.count)
  87. err := createTopicData(masterAddr, filerAddr,
  88. topicConfig.namespace, topicConfig.topic,
  89. topicConfig.generator, topicConfig.count)
  90. if err != nil {
  91. log.Printf("Error creating topic %s.%s: %v",
  92. topicConfig.namespace, topicConfig.topic, err)
  93. } else {
  94. log.Printf("✓ Successfully created %s.%s",
  95. topicConfig.namespace, topicConfig.topic)
  96. }
  97. // Small delay between topics
  98. time.Sleep(2 * time.Second)
  99. }
  100. log.Println("✓ MQ test data creation completed!")
  101. log.Println("\nCreated namespaces:")
  102. log.Println(" - analytics (user_events, system_logs, metrics)")
  103. log.Println(" - ecommerce (product_views, user_events)")
  104. log.Println(" - logs (application_logs, error_logs)")
  105. log.Println("\nYou can now test with PostgreSQL clients:")
  106. log.Println(" psql -h localhost -p 5432 -U seaweedfs -d analytics")
  107. log.Println(" postgres=> SHOW TABLES;")
  108. log.Println(" postgres=> SELECT COUNT(*) FROM user_events;")
  109. }
  110. // createSchemaForTopic creates a proper RecordType schema based on topic name
  111. func createSchemaForTopic(topicName string) *schema_pb.RecordType {
  112. switch topicName {
  113. case "user_events":
  114. return &schema_pb.RecordType{
  115. Fields: []*schema_pb.Field{
  116. {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
  117. {Name: "user_id", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
  118. {Name: "user_type", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  119. {Name: "action", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  120. {Name: "status", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  121. {Name: "amount", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: false},
  122. {Name: "timestamp", FieldIndex: 6, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  123. {Name: "metadata", FieldIndex: 7, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: false},
  124. },
  125. }
  126. case "system_logs":
  127. return &schema_pb.RecordType{
  128. Fields: []*schema_pb.Field{
  129. {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
  130. {Name: "level", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  131. {Name: "service", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  132. {Name: "message", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  133. {Name: "error_code", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: false},
  134. {Name: "timestamp", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  135. },
  136. }
  137. case "metrics":
  138. return &schema_pb.RecordType{
  139. Fields: []*schema_pb.Field{
  140. {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
  141. {Name: "name", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  142. {Name: "value", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: true},
  143. {Name: "tags", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  144. {Name: "timestamp", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  145. },
  146. }
  147. case "product_views":
  148. return &schema_pb.RecordType{
  149. Fields: []*schema_pb.Field{
  150. {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
  151. {Name: "product_id", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
  152. {Name: "user_id", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
  153. {Name: "category", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  154. {Name: "price", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: true},
  155. {Name: "view_count", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: true},
  156. {Name: "timestamp", FieldIndex: 6, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  157. },
  158. }
  159. case "application_logs", "error_logs":
  160. return &schema_pb.RecordType{
  161. Fields: []*schema_pb.Field{
  162. {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
  163. {Name: "level", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  164. {Name: "service", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  165. {Name: "message", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  166. {Name: "error_code", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: false},
  167. {Name: "timestamp", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
  168. },
  169. }
  170. default:
  171. // Default generic schema
  172. return &schema_pb.RecordType{
  173. Fields: []*schema_pb.Field{
  174. {Name: "data", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES}}, IsRequired: true},
  175. },
  176. }
  177. }
  178. }
  179. // convertToDecimal converts a string to decimal format for Parquet logical type
  180. func convertToDecimal(value string) ([]byte, int32, int32) {
  181. // Parse the decimal string using big.Rat for precision
  182. rat := new(big.Rat)
  183. if _, success := rat.SetString(value); !success {
  184. return nil, 0, 0
  185. }
  186. // Convert to a fixed scale (e.g., 4 decimal places)
  187. scale := int32(4)
  188. precision := int32(18) // Total digits
  189. // Scale the rational number to integer representation
  190. multiplier := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil)
  191. scaled := new(big.Int).Mul(rat.Num(), multiplier)
  192. scaled.Div(scaled, rat.Denom())
  193. return scaled.Bytes(), precision, scale
  194. }
  195. // convertToRecordValue converts Go structs to RecordValue format
  196. func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) {
  197. fields := make(map[string]*schema_pb.Value)
  198. switch v := data.(type) {
  199. case UserEvent:
  200. fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}}
  201. fields["user_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.UserID}}
  202. fields["user_type"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.UserType}}
  203. fields["action"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Action}}
  204. fields["status"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Status}}
  205. fields["amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Amount}}
  206. // Convert precise amount to DECIMAL logical type
  207. if v.PreciseAmount != "" {
  208. if decimal, precision, scale := convertToDecimal(v.PreciseAmount); decimal != nil {
  209. fields["precise_amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DecimalValue{DecimalValue: &schema_pb.DecimalValue{
  210. Value: decimal,
  211. Precision: precision,
  212. Scale: scale,
  213. }}}
  214. }
  215. }
  216. // Convert birth date to DATE logical type
  217. fields["birth_date"] = &schema_pb.Value{Kind: &schema_pb.Value_DateValue{DateValue: &schema_pb.DateValue{
  218. DaysSinceEpoch: int32(v.BirthDate.Unix() / 86400), // Convert to days since epoch
  219. }}}
  220. fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
  221. TimestampMicros: v.Timestamp.UnixMicro(),
  222. IsUtc: true,
  223. }}}
  224. fields["metadata"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Metadata}}
  225. case SystemLog:
  226. fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}}
  227. fields["level"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Level}}
  228. fields["service"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Service}}
  229. fields["message"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Message}}
  230. fields["error_code"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ErrorCode)}}
  231. fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
  232. TimestampMicros: v.Timestamp.UnixMicro(),
  233. IsUtc: true,
  234. }}}
  235. case MetricEntry:
  236. fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}}
  237. fields["name"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Name}}
  238. fields["value"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Value}}
  239. fields["tags"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Tags}}
  240. fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
  241. TimestampMicros: v.Timestamp.UnixMicro(),
  242. IsUtc: true,
  243. }}}
  244. case ProductView:
  245. fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}}
  246. fields["product_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ProductID}}
  247. fields["user_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.UserID}}
  248. fields["category"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Category}}
  249. fields["price"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Price}}
  250. fields["view_count"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ViewCount)}}
  251. fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
  252. TimestampMicros: v.Timestamp.UnixMicro(),
  253. IsUtc: true,
  254. }}}
  255. default:
  256. // Fallback to JSON for unknown types
  257. jsonData, err := json.Marshal(data)
  258. if err != nil {
  259. return nil, fmt.Errorf("failed to marshal unknown type: %v", err)
  260. }
  261. fields["data"] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: jsonData}}
  262. }
  263. return &schema_pb.RecordValue{Fields: fields}, nil
  264. }
  265. // convertHTTPToGRPC converts HTTP address to gRPC address
  266. // Follows SeaweedFS convention: gRPC port = HTTP port + 10000
  267. func convertHTTPToGRPC(httpAddress string) string {
  268. if strings.Contains(httpAddress, ":") {
  269. parts := strings.Split(httpAddress, ":")
  270. if len(parts) == 2 {
  271. if port, err := strconv.Atoi(parts[1]); err == nil {
  272. return fmt.Sprintf("%s:%d", parts[0], port+10000)
  273. }
  274. }
  275. }
  276. // Fallback: return original address if conversion fails
  277. return httpAddress
  278. }
  279. // discoverFiler finds a filer from the master server
  280. func discoverFiler(masterHTTPAddress string) (string, error) {
  281. masterGRPCAddress := convertHTTPToGRPC(masterHTTPAddress)
  282. conn, err := grpc.Dial(masterGRPCAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
  283. if err != nil {
  284. return "", fmt.Errorf("failed to connect to master at %s: %v", masterGRPCAddress, err)
  285. }
  286. defer conn.Close()
  287. client := master_pb.NewSeaweedClient(conn)
  288. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  289. defer cancel()
  290. resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{
  291. ClientType: cluster.FilerType,
  292. })
  293. if err != nil {
  294. return "", fmt.Errorf("failed to list filers from master: %v", err)
  295. }
  296. if len(resp.ClusterNodes) == 0 {
  297. return "", fmt.Errorf("no filers found in cluster")
  298. }
  299. // Use the first available filer and convert HTTP address to gRPC
  300. filerHTTPAddress := resp.ClusterNodes[0].Address
  301. return convertHTTPToGRPC(filerHTTPAddress), nil
  302. }
  303. // discoverBroker finds the broker balancer using filer lock mechanism
  304. func discoverBroker(masterHTTPAddress string) (string, error) {
  305. // First discover filer from master
  306. filerAddress, err := discoverFiler(masterHTTPAddress)
  307. if err != nil {
  308. return "", fmt.Errorf("failed to discover filer: %v", err)
  309. }
  310. conn, err := grpc.Dial(filerAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
  311. if err != nil {
  312. return "", fmt.Errorf("failed to connect to filer at %s: %v", filerAddress, err)
  313. }
  314. defer conn.Close()
  315. client := filer_pb.NewSeaweedFilerClient(conn)
  316. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  317. defer cancel()
  318. resp, err := client.FindLockOwner(ctx, &filer_pb.FindLockOwnerRequest{
  319. Name: pub_balancer.LockBrokerBalancer,
  320. })
  321. if err != nil {
  322. return "", fmt.Errorf("failed to find broker balancer: %v", err)
  323. }
  324. return resp.Owner, nil
  325. }
  326. func createTopicData(masterAddr, filerAddr, namespace, topicName string,
  327. generator func() interface{}, count int) error {
  328. // Create schema based on topic type
  329. recordType := createSchemaForTopic(topicName)
  330. // Dynamically discover broker address instead of hardcoded port replacement
  331. brokerAddress, err := discoverBroker(masterAddr)
  332. if err != nil {
  333. // Fallback to hardcoded port replacement if discovery fails
  334. log.Printf("Warning: Failed to discover broker dynamically (%v), using hardcoded port replacement", err)
  335. brokerAddress = strings.Replace(masterAddr, ":9333", ":17777", 1)
  336. }
  337. // Create publisher configuration
  338. config := &pub_client.PublisherConfiguration{
  339. Topic: topic.NewTopic(namespace, topicName),
  340. PartitionCount: 1,
  341. Brokers: []string{brokerAddress}, // Use dynamically discovered broker address
  342. PublisherName: fmt.Sprintf("test-producer-%s-%s", namespace, topicName),
  343. RecordType: recordType, // Use structured schema
  344. }
  345. // Create publisher
  346. publisher, err := pub_client.NewTopicPublisher(config)
  347. if err != nil {
  348. return fmt.Errorf("failed to create publisher: %v", err)
  349. }
  350. defer publisher.Shutdown()
  351. // Generate and publish data
  352. for i := 0; i < count; i++ {
  353. data := generator()
  354. // Convert struct to RecordValue
  355. recordValue, err := convertToRecordValue(data)
  356. if err != nil {
  357. log.Printf("Error converting data to RecordValue: %v", err)
  358. continue
  359. }
  360. // Publish structured record
  361. err = publisher.PublishRecord([]byte(fmt.Sprintf("key-%d", i)), recordValue)
  362. if err != nil {
  363. log.Printf("Error publishing message %d: %v", i+1, err)
  364. continue
  365. }
  366. // Small delay every 100 messages
  367. if (i+1)%100 == 0 {
  368. log.Printf(" Published %d/%d messages to %s.%s",
  369. i+1, count, namespace, topicName)
  370. time.Sleep(100 * time.Millisecond)
  371. }
  372. }
  373. // Finish publishing
  374. err = publisher.FinishPublish()
  375. if err != nil {
  376. return fmt.Errorf("failed to finish publishing: %v", err)
  377. }
  378. return nil
  379. }
  380. func generateUserEvent() interface{} {
  381. userTypes := []string{"premium", "standard", "trial", "enterprise"}
  382. actions := []string{"login", "logout", "purchase", "view", "search", "click", "download"}
  383. statuses := []string{"active", "inactive", "pending", "completed", "failed"}
  384. // Generate a birth date between 1970 and 2005 (18+ years old)
  385. birthYear := 1970 + rand.Intn(35)
  386. birthMonth := 1 + rand.Intn(12)
  387. birthDay := 1 + rand.Intn(28) // Keep it simple, avoid month-specific day issues
  388. birthDate := time.Date(birthYear, time.Month(birthMonth), birthDay, 0, 0, 0, 0, time.UTC)
  389. // Generate a precise amount as a string with 4 decimal places
  390. preciseAmount := fmt.Sprintf("%.4f", rand.Float64()*10000)
  391. return UserEvent{
  392. ID: rand.Int63n(1000000) + 1,
  393. UserID: rand.Int63n(10000) + 1,
  394. UserType: userTypes[rand.Intn(len(userTypes))],
  395. Action: actions[rand.Intn(len(actions))],
  396. Status: statuses[rand.Intn(len(statuses))],
  397. Amount: rand.Float64() * 1000,
  398. PreciseAmount: preciseAmount,
  399. BirthDate: birthDate,
  400. Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*30)) * time.Second),
  401. Metadata: fmt.Sprintf("{\"session_id\":\"%d\"}", rand.Int63n(100000)),
  402. }
  403. }
  404. func generateSystemLog() interface{} {
  405. levels := []string{"debug", "info", "warning", "error", "critical"}
  406. services := []string{"auth-service", "payment-service", "user-service", "notification-service", "api-gateway"}
  407. messages := []string{
  408. "Request processed successfully",
  409. "User authentication completed",
  410. "Payment transaction initiated",
  411. "Database connection established",
  412. "Cache miss for key",
  413. "API rate limit exceeded",
  414. "Service health check passed",
  415. }
  416. return SystemLog{
  417. ID: rand.Int63n(1000000) + 1,
  418. Level: levels[rand.Intn(len(levels))],
  419. Service: services[rand.Intn(len(services))],
  420. Message: messages[rand.Intn(len(messages))],
  421. ErrorCode: rand.Intn(1000),
  422. Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*7)) * time.Second),
  423. }
  424. }
  425. func generateErrorLog() interface{} {
  426. levels := []string{"error", "critical", "fatal"}
  427. services := []string{"auth-service", "payment-service", "user-service", "notification-service", "api-gateway"}
  428. messages := []string{
  429. "Database connection failed",
  430. "Authentication token expired",
  431. "Payment processing error",
  432. "Service unavailable",
  433. "Memory limit exceeded",
  434. "Timeout waiting for response",
  435. "Invalid request parameters",
  436. }
  437. return SystemLog{
  438. ID: rand.Int63n(1000000) + 1,
  439. Level: levels[rand.Intn(len(levels))],
  440. Service: services[rand.Intn(len(services))],
  441. Message: messages[rand.Intn(len(messages))],
  442. ErrorCode: rand.Intn(100) + 400, // 400-499 error codes
  443. Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*7)) * time.Second),
  444. }
  445. }
  446. func generateMetric() interface{} {
  447. names := []string{"cpu_usage", "memory_usage", "disk_usage", "request_latency", "error_rate", "throughput"}
  448. tags := []string{
  449. "service=web,region=us-east",
  450. "service=api,region=us-west",
  451. "service=db,region=eu-central",
  452. "service=cache,region=asia-pacific",
  453. }
  454. return MetricEntry{
  455. ID: rand.Int63n(1000000) + 1,
  456. Name: names[rand.Intn(len(names))],
  457. Value: rand.Float64() * 100,
  458. Tags: tags[rand.Intn(len(tags))],
  459. Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*3)) * time.Second),
  460. }
  461. }
  462. func generateProductView() interface{} {
  463. categories := []string{"electronics", "books", "clothing", "home", "sports", "automotive"}
  464. return ProductView{
  465. ID: rand.Int63n(1000000) + 1,
  466. ProductID: rand.Int63n(10000) + 1,
  467. UserID: rand.Int63n(5000) + 1,
  468. Category: categories[rand.Intn(len(categories))],
  469. Price: rand.Float64() * 500,
  470. ViewCount: rand.Intn(100) + 1,
  471. Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*14)) * time.Second),
  472. }
  473. }
  474. func getEnv(key, defaultValue string) string {
  475. if value, exists := os.LookupEnv(key); exists {
  476. return value
  477. }
  478. return defaultValue
  479. }