client_management.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package dash
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/cluster"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/operation"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  13. )
  14. // WithMasterClient executes a function with a master client connection
  15. func (s *AdminServer) WithMasterClient(f func(client master_pb.SeaweedClient) error) error {
  16. return s.masterClient.WithClient(false, f)
  17. }
  18. // WithFilerClient executes a function with a filer client connection
  19. func (s *AdminServer) WithFilerClient(f func(client filer_pb.SeaweedFilerClient) error) error {
  20. filerAddr := s.GetFilerAddress()
  21. if filerAddr == "" {
  22. return fmt.Errorf("no filer available")
  23. }
  24. return pb.WithGrpcFilerClient(false, 0, pb.ServerAddress(filerAddr), s.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  25. return f(client)
  26. })
  27. }
  28. // WithVolumeServerClient executes a function with a volume server client connection
  29. func (s *AdminServer) WithVolumeServerClient(address pb.ServerAddress, f func(client volume_server_pb.VolumeServerClient) error) error {
  30. return operation.WithVolumeServerClient(false, address, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  31. return f(client)
  32. })
  33. }
  34. // GetFilerAddress returns a filer address, discovering from masters if needed
  35. func (s *AdminServer) GetFilerAddress() string {
  36. // Discover filers from masters
  37. filers := s.getDiscoveredFilers()
  38. if len(filers) > 0 {
  39. return filers[0] // Return the first available filer
  40. }
  41. return ""
  42. }
  43. // getDiscoveredFilers returns cached filers or discovers them from masters
  44. func (s *AdminServer) getDiscoveredFilers() []string {
  45. // Check if cache is still valid
  46. if time.Since(s.lastFilerUpdate) < s.filerCacheExpiration && len(s.cachedFilers) > 0 {
  47. return s.cachedFilers
  48. }
  49. // Discover filers from masters
  50. var filers []string
  51. err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
  52. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  53. ClientType: cluster.FilerType,
  54. })
  55. if err != nil {
  56. return err
  57. }
  58. for _, node := range resp.ClusterNodes {
  59. filers = append(filers, node.Address)
  60. }
  61. return nil
  62. })
  63. if err != nil {
  64. currentMaster := s.masterClient.GetMaster(context.Background())
  65. glog.Warningf("Failed to discover filers from master %s: %v", currentMaster, err)
  66. // Return cached filers even if expired, better than nothing
  67. return s.cachedFilers
  68. }
  69. // Update cache
  70. s.cachedFilers = filers
  71. s.lastFilerUpdate = time.Now()
  72. return filers
  73. }
  74. // GetAllFilers returns all discovered filers
  75. func (s *AdminServer) GetAllFilers() []string {
  76. return s.getDiscoveredFilers()
  77. }