| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "log"
- "math/big"
- "math/rand"
- "os"
- "strconv"
- "strings"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/cluster"
- "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
- "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
- )
- type UserEvent struct {
- ID int64 `json:"id"`
- UserID int64 `json:"user_id"`
- UserType string `json:"user_type"`
- Action string `json:"action"`
- Status string `json:"status"`
- Amount float64 `json:"amount,omitempty"`
- PreciseAmount string `json:"precise_amount,omitempty"` // Will be converted to DECIMAL
- BirthDate time.Time `json:"birth_date"` // Will be converted to DATE
- Timestamp time.Time `json:"timestamp"`
- Metadata string `json:"metadata,omitempty"`
- }
- type SystemLog struct {
- ID int64 `json:"id"`
- Level string `json:"level"`
- Service string `json:"service"`
- Message string `json:"message"`
- ErrorCode int `json:"error_code,omitempty"`
- Timestamp time.Time `json:"timestamp"`
- }
- type MetricEntry struct {
- ID int64 `json:"id"`
- Name string `json:"name"`
- Value float64 `json:"value"`
- Tags string `json:"tags"`
- Timestamp time.Time `json:"timestamp"`
- }
- type ProductView struct {
- ID int64 `json:"id"`
- ProductID int64 `json:"product_id"`
- UserID int64 `json:"user_id"`
- Category string `json:"category"`
- Price float64 `json:"price"`
- ViewCount int `json:"view_count"`
- Timestamp time.Time `json:"timestamp"`
- }
- func main() {
- // Get SeaweedFS configuration from environment
- masterAddr := getEnv("SEAWEEDFS_MASTER", "localhost:9333")
- filerAddr := getEnv("SEAWEEDFS_FILER", "localhost:8888")
- log.Printf("Creating MQ test data...")
- log.Printf("Master: %s", masterAddr)
- log.Printf("Filer: %s", filerAddr)
- // Wait for SeaweedFS to be ready
- log.Println("Waiting for SeaweedFS to be ready...")
- time.Sleep(10 * time.Second)
- // Create topics and populate with data
- topics := []struct {
- namespace string
- topic string
- generator func() interface{}
- count int
- }{
- {"analytics", "user_events", generateUserEvent, 1000},
- {"analytics", "system_logs", generateSystemLog, 500},
- {"analytics", "metrics", generateMetric, 800},
- {"ecommerce", "product_views", generateProductView, 1200},
- {"ecommerce", "user_events", generateUserEvent, 600},
- {"logs", "application_logs", generateSystemLog, 2000},
- {"logs", "error_logs", generateErrorLog, 300},
- }
- for _, topicConfig := range topics {
- log.Printf("Creating topic %s.%s with %d records...",
- topicConfig.namespace, topicConfig.topic, topicConfig.count)
- err := createTopicData(masterAddr, filerAddr,
- topicConfig.namespace, topicConfig.topic,
- topicConfig.generator, topicConfig.count)
- if err != nil {
- log.Printf("Error creating topic %s.%s: %v",
- topicConfig.namespace, topicConfig.topic, err)
- } else {
- log.Printf("✓ Successfully created %s.%s",
- topicConfig.namespace, topicConfig.topic)
- }
- // Small delay between topics
- time.Sleep(2 * time.Second)
- }
- log.Println("✓ MQ test data creation completed!")
- log.Println("\nCreated namespaces:")
- log.Println(" - analytics (user_events, system_logs, metrics)")
- log.Println(" - ecommerce (product_views, user_events)")
- log.Println(" - logs (application_logs, error_logs)")
- log.Println("\nYou can now test with PostgreSQL clients:")
- log.Println(" psql -h localhost -p 5432 -U seaweedfs -d analytics")
- log.Println(" postgres=> SHOW TABLES;")
- log.Println(" postgres=> SELECT COUNT(*) FROM user_events;")
- }
- // createSchemaForTopic creates a proper RecordType schema based on topic name
- func createSchemaForTopic(topicName string) *schema_pb.RecordType {
- switch topicName {
- case "user_events":
- return &schema_pb.RecordType{
- Fields: []*schema_pb.Field{
- {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
- {Name: "user_id", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
- {Name: "user_type", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- {Name: "action", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- {Name: "status", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- {Name: "amount", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: false},
- {Name: "timestamp", FieldIndex: 6, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- {Name: "metadata", FieldIndex: 7, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: false},
- },
- }
- case "system_logs":
- return &schema_pb.RecordType{
- Fields: []*schema_pb.Field{
- {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
- {Name: "level", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- {Name: "service", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- {Name: "message", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- {Name: "error_code", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: false},
- {Name: "timestamp", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- },
- }
- case "metrics":
- return &schema_pb.RecordType{
- Fields: []*schema_pb.Field{
- {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
- {Name: "name", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- {Name: "value", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: true},
- {Name: "tags", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- {Name: "timestamp", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- },
- }
- case "product_views":
- return &schema_pb.RecordType{
- Fields: []*schema_pb.Field{
- {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
- {Name: "product_id", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
- {Name: "user_id", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
- {Name: "category", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- {Name: "price", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}, IsRequired: true},
- {Name: "view_count", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: true},
- {Name: "timestamp", FieldIndex: 6, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- },
- }
- case "application_logs", "error_logs":
- return &schema_pb.RecordType{
- Fields: []*schema_pb.Field{
- {Name: "id", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}, IsRequired: true},
- {Name: "level", FieldIndex: 1, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- {Name: "service", FieldIndex: 2, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- {Name: "message", FieldIndex: 3, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- {Name: "error_code", FieldIndex: 4, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}, IsRequired: false},
- {Name: "timestamp", FieldIndex: 5, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}, IsRequired: true},
- },
- }
- default:
- // Default generic schema
- return &schema_pb.RecordType{
- Fields: []*schema_pb.Field{
- {Name: "data", FieldIndex: 0, Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BYTES}}, IsRequired: true},
- },
- }
- }
- }
- // convertToDecimal converts a string to decimal format for Parquet logical type
- func convertToDecimal(value string) ([]byte, int32, int32) {
- // Parse the decimal string using big.Rat for precision
- rat := new(big.Rat)
- if _, success := rat.SetString(value); !success {
- return nil, 0, 0
- }
- // Convert to a fixed scale (e.g., 4 decimal places)
- scale := int32(4)
- precision := int32(18) // Total digits
- // Scale the rational number to integer representation
- multiplier := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil)
- scaled := new(big.Int).Mul(rat.Num(), multiplier)
- scaled.Div(scaled, rat.Denom())
- return scaled.Bytes(), precision, scale
- }
- // convertToRecordValue converts Go structs to RecordValue format
- func convertToRecordValue(data interface{}) (*schema_pb.RecordValue, error) {
- fields := make(map[string]*schema_pb.Value)
- switch v := data.(type) {
- case UserEvent:
- fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}}
- fields["user_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.UserID}}
- fields["user_type"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.UserType}}
- fields["action"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Action}}
- fields["status"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Status}}
- fields["amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Amount}}
- // Convert precise amount to DECIMAL logical type
- if v.PreciseAmount != "" {
- if decimal, precision, scale := convertToDecimal(v.PreciseAmount); decimal != nil {
- fields["precise_amount"] = &schema_pb.Value{Kind: &schema_pb.Value_DecimalValue{DecimalValue: &schema_pb.DecimalValue{
- Value: decimal,
- Precision: precision,
- Scale: scale,
- }}}
- }
- }
- // Convert birth date to DATE logical type
- fields["birth_date"] = &schema_pb.Value{Kind: &schema_pb.Value_DateValue{DateValue: &schema_pb.DateValue{
- DaysSinceEpoch: int32(v.BirthDate.Unix() / 86400), // Convert to days since epoch
- }}}
- fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
- TimestampMicros: v.Timestamp.UnixMicro(),
- IsUtc: true,
- }}}
- fields["metadata"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Metadata}}
- case SystemLog:
- fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}}
- fields["level"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Level}}
- fields["service"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Service}}
- fields["message"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Message}}
- fields["error_code"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ErrorCode)}}
- fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
- TimestampMicros: v.Timestamp.UnixMicro(),
- IsUtc: true,
- }}}
- case MetricEntry:
- fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}}
- fields["name"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Name}}
- fields["value"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Value}}
- fields["tags"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Tags}}
- fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
- TimestampMicros: v.Timestamp.UnixMicro(),
- IsUtc: true,
- }}}
- case ProductView:
- fields["id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ID}}
- fields["product_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.ProductID}}
- fields["user_id"] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: v.UserID}}
- fields["category"] = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: v.Category}}
- fields["price"] = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: v.Price}}
- fields["view_count"] = &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: int32(v.ViewCount)}}
- fields["timestamp"] = &schema_pb.Value{Kind: &schema_pb.Value_TimestampValue{TimestampValue: &schema_pb.TimestampValue{
- TimestampMicros: v.Timestamp.UnixMicro(),
- IsUtc: true,
- }}}
- default:
- // Fallback to JSON for unknown types
- jsonData, err := json.Marshal(data)
- if err != nil {
- return nil, fmt.Errorf("failed to marshal unknown type: %v", err)
- }
- fields["data"] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: jsonData}}
- }
- return &schema_pb.RecordValue{Fields: fields}, nil
- }
- // convertHTTPToGRPC converts HTTP address to gRPC address
- // Follows SeaweedFS convention: gRPC port = HTTP port + 10000
- func convertHTTPToGRPC(httpAddress string) string {
- if strings.Contains(httpAddress, ":") {
- parts := strings.Split(httpAddress, ":")
- if len(parts) == 2 {
- if port, err := strconv.Atoi(parts[1]); err == nil {
- return fmt.Sprintf("%s:%d", parts[0], port+10000)
- }
- }
- }
- // Fallback: return original address if conversion fails
- return httpAddress
- }
- // discoverFiler finds a filer from the master server
- func discoverFiler(masterHTTPAddress string) (string, error) {
- masterGRPCAddress := convertHTTPToGRPC(masterHTTPAddress)
- conn, err := grpc.Dial(masterGRPCAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- return "", fmt.Errorf("failed to connect to master at %s: %v", masterGRPCAddress, err)
- }
- defer conn.Close()
- client := master_pb.NewSeaweedClient(conn)
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{
- ClientType: cluster.FilerType,
- })
- if err != nil {
- return "", fmt.Errorf("failed to list filers from master: %v", err)
- }
- if len(resp.ClusterNodes) == 0 {
- return "", fmt.Errorf("no filers found in cluster")
- }
- // Use the first available filer and convert HTTP address to gRPC
- filerHTTPAddress := resp.ClusterNodes[0].Address
- return convertHTTPToGRPC(filerHTTPAddress), nil
- }
- // discoverBroker finds the broker balancer using filer lock mechanism
- func discoverBroker(masterHTTPAddress string) (string, error) {
- // First discover filer from master
- filerAddress, err := discoverFiler(masterHTTPAddress)
- if err != nil {
- return "", fmt.Errorf("failed to discover filer: %v", err)
- }
- conn, err := grpc.Dial(filerAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- return "", fmt.Errorf("failed to connect to filer at %s: %v", filerAddress, err)
- }
- defer conn.Close()
- client := filer_pb.NewSeaweedFilerClient(conn)
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- resp, err := client.FindLockOwner(ctx, &filer_pb.FindLockOwnerRequest{
- Name: pub_balancer.LockBrokerBalancer,
- })
- if err != nil {
- return "", fmt.Errorf("failed to find broker balancer: %v", err)
- }
- return resp.Owner, nil
- }
- func createTopicData(masterAddr, filerAddr, namespace, topicName string,
- generator func() interface{}, count int) error {
- // Create schema based on topic type
- recordType := createSchemaForTopic(topicName)
- // Dynamically discover broker address instead of hardcoded port replacement
- brokerAddress, err := discoverBroker(masterAddr)
- if err != nil {
- // Fallback to hardcoded port replacement if discovery fails
- log.Printf("Warning: Failed to discover broker dynamically (%v), using hardcoded port replacement", err)
- brokerAddress = strings.Replace(masterAddr, ":9333", ":17777", 1)
- }
- // Create publisher configuration
- config := &pub_client.PublisherConfiguration{
- Topic: topic.NewTopic(namespace, topicName),
- PartitionCount: 1,
- Brokers: []string{brokerAddress}, // Use dynamically discovered broker address
- PublisherName: fmt.Sprintf("test-producer-%s-%s", namespace, topicName),
- RecordType: recordType, // Use structured schema
- }
- // Create publisher
- publisher, err := pub_client.NewTopicPublisher(config)
- if err != nil {
- return fmt.Errorf("failed to create publisher: %v", err)
- }
- defer publisher.Shutdown()
- // Generate and publish data
- for i := 0; i < count; i++ {
- data := generator()
- // Convert struct to RecordValue
- recordValue, err := convertToRecordValue(data)
- if err != nil {
- log.Printf("Error converting data to RecordValue: %v", err)
- continue
- }
- // Publish structured record
- err = publisher.PublishRecord([]byte(fmt.Sprintf("key-%d", i)), recordValue)
- if err != nil {
- log.Printf("Error publishing message %d: %v", i+1, err)
- continue
- }
- // Small delay every 100 messages
- if (i+1)%100 == 0 {
- log.Printf(" Published %d/%d messages to %s.%s",
- i+1, count, namespace, topicName)
- time.Sleep(100 * time.Millisecond)
- }
- }
- // Finish publishing
- err = publisher.FinishPublish()
- if err != nil {
- return fmt.Errorf("failed to finish publishing: %v", err)
- }
- return nil
- }
- func generateUserEvent() interface{} {
- userTypes := []string{"premium", "standard", "trial", "enterprise"}
- actions := []string{"login", "logout", "purchase", "view", "search", "click", "download"}
- statuses := []string{"active", "inactive", "pending", "completed", "failed"}
- // Generate a birth date between 1970 and 2005 (18+ years old)
- birthYear := 1970 + rand.Intn(35)
- birthMonth := 1 + rand.Intn(12)
- birthDay := 1 + rand.Intn(28) // Keep it simple, avoid month-specific day issues
- birthDate := time.Date(birthYear, time.Month(birthMonth), birthDay, 0, 0, 0, 0, time.UTC)
- // Generate a precise amount as a string with 4 decimal places
- preciseAmount := fmt.Sprintf("%.4f", rand.Float64()*10000)
- return UserEvent{
- ID: rand.Int63n(1000000) + 1,
- UserID: rand.Int63n(10000) + 1,
- UserType: userTypes[rand.Intn(len(userTypes))],
- Action: actions[rand.Intn(len(actions))],
- Status: statuses[rand.Intn(len(statuses))],
- Amount: rand.Float64() * 1000,
- PreciseAmount: preciseAmount,
- BirthDate: birthDate,
- Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*30)) * time.Second),
- Metadata: fmt.Sprintf("{\"session_id\":\"%d\"}", rand.Int63n(100000)),
- }
- }
- func generateSystemLog() interface{} {
- levels := []string{"debug", "info", "warning", "error", "critical"}
- services := []string{"auth-service", "payment-service", "user-service", "notification-service", "api-gateway"}
- messages := []string{
- "Request processed successfully",
- "User authentication completed",
- "Payment transaction initiated",
- "Database connection established",
- "Cache miss for key",
- "API rate limit exceeded",
- "Service health check passed",
- }
- return SystemLog{
- ID: rand.Int63n(1000000) + 1,
- Level: levels[rand.Intn(len(levels))],
- Service: services[rand.Intn(len(services))],
- Message: messages[rand.Intn(len(messages))],
- ErrorCode: rand.Intn(1000),
- Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*7)) * time.Second),
- }
- }
- func generateErrorLog() interface{} {
- levels := []string{"error", "critical", "fatal"}
- services := []string{"auth-service", "payment-service", "user-service", "notification-service", "api-gateway"}
- messages := []string{
- "Database connection failed",
- "Authentication token expired",
- "Payment processing error",
- "Service unavailable",
- "Memory limit exceeded",
- "Timeout waiting for response",
- "Invalid request parameters",
- }
- return SystemLog{
- ID: rand.Int63n(1000000) + 1,
- Level: levels[rand.Intn(len(levels))],
- Service: services[rand.Intn(len(services))],
- Message: messages[rand.Intn(len(messages))],
- ErrorCode: rand.Intn(100) + 400, // 400-499 error codes
- Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*7)) * time.Second),
- }
- }
- func generateMetric() interface{} {
- names := []string{"cpu_usage", "memory_usage", "disk_usage", "request_latency", "error_rate", "throughput"}
- tags := []string{
- "service=web,region=us-east",
- "service=api,region=us-west",
- "service=db,region=eu-central",
- "service=cache,region=asia-pacific",
- }
- return MetricEntry{
- ID: rand.Int63n(1000000) + 1,
- Name: names[rand.Intn(len(names))],
- Value: rand.Float64() * 100,
- Tags: tags[rand.Intn(len(tags))],
- Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*3)) * time.Second),
- }
- }
- func generateProductView() interface{} {
- categories := []string{"electronics", "books", "clothing", "home", "sports", "automotive"}
- return ProductView{
- ID: rand.Int63n(1000000) + 1,
- ProductID: rand.Int63n(10000) + 1,
- UserID: rand.Int63n(5000) + 1,
- Category: categories[rand.Intn(len(categories))],
- Price: rand.Float64() * 500,
- ViewCount: rand.Intn(100) + 1,
- Timestamp: time.Now().Add(-time.Duration(rand.Intn(86400*14)) * time.Second),
- }
- }
- func getEnv(key, defaultValue string) string {
- if value, exists := os.LookupEnv(key); exists {
- return value
- }
- return defaultValue
- }
|