mq_agent.go 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package command
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/mq/agent"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  5. "google.golang.org/grpc/reflection"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/security"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. )
  11. var (
  12. mqAgentOptions MessageQueueAgentOptions
  13. )
  14. type MessageQueueAgentOptions struct {
  15. brokers []pb.ServerAddress
  16. brokersString *string
  17. filerGroup *string
  18. ip *string
  19. port *int
  20. }
  21. func init() {
  22. cmdMqAgent.Run = runMqAgent // break init cycle
  23. mqAgentOptions.brokersString = cmdMqAgent.Flag.String("broker", "localhost:17777", "comma-separated message queue brokers")
  24. mqAgentOptions.ip = cmdMqAgent.Flag.String("ip", "", "message queue agent host address")
  25. mqAgentOptions.port = cmdMqAgent.Flag.Int("port", 16777, "message queue agent gRPC server port")
  26. }
  27. var cmdMqAgent = &Command{
  28. UsageLine: "mq.agent [-port=16777] [-broker=<ip:port>]",
  29. Short: "<WIP> start a message queue agent",
  30. Long: `start a message queue agent
  31. The agent runs on local server to accept gRPC calls to write or read messages.
  32. The messages are sent to message queue brokers.
  33. `,
  34. }
  35. func runMqAgent(cmd *Command, args []string) bool {
  36. util.LoadSecurityConfiguration()
  37. mqAgentOptions.brokers = pb.ServerAddresses(*mqAgentOptions.brokersString).ToAddresses()
  38. return mqAgentOptions.startQueueAgent()
  39. }
  40. func (mqAgentOpt *MessageQueueAgentOptions) startQueueAgent() bool {
  41. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_agent")
  42. agentServer := agent.NewMessageQueueAgent(&agent.MessageQueueAgentOptions{
  43. SeedBrokers: mqAgentOpt.brokers,
  44. }, grpcDialOption)
  45. // start grpc listener
  46. grpcL, localL, err := util.NewIpAndLocalListeners(*mqAgentOpt.ip, *mqAgentOpt.port, 0)
  47. if err != nil {
  48. glog.Fatalf("failed to listen on grpc port %d: %v", *mqAgentOpt.port, err)
  49. }
  50. // Create main gRPC server
  51. grpcS := pb.NewGrpcServer()
  52. mq_agent_pb.RegisterSeaweedMessagingAgentServer(grpcS, agentServer)
  53. reflection.Register(grpcS)
  54. // Start localhost listener if available
  55. if localL != nil {
  56. localGrpcS := pb.NewGrpcServer()
  57. mq_agent_pb.RegisterSeaweedMessagingAgentServer(localGrpcS, agentServer)
  58. reflection.Register(localGrpcS)
  59. go func() {
  60. glog.V(0).Infof("MQ Agent listening on localhost:%d", *mqAgentOpt.port)
  61. if err := localGrpcS.Serve(localL); err != nil {
  62. glog.Errorf("MQ Agent localhost listener error: %v", err)
  63. }
  64. }()
  65. }
  66. glog.Infof("Start Seaweed Message Queue Agent on %s:%d", *mqAgentOpt.ip, *mqAgentOpt.port)
  67. grpcS.Serve(grpcL)
  68. return true
  69. }