filer_client_accessor.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package filer_client
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "google.golang.org/grpc"
  9. )
  10. type FilerClientAccessor struct {
  11. GetFiler func() pb.ServerAddress
  12. GetGrpcDialOption func() grpc.DialOption
  13. }
  14. func (fca *FilerClientAccessor) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  15. return pb.WithFilerClient(streamingMode, 0, fca.GetFiler(), fca.GetGrpcDialOption(), fn)
  16. }
  17. func (fca *FilerClientAccessor) SaveTopicConfToFiler(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) error {
  18. glog.V(0).Infof("save conf for topic %v to filer", t)
  19. // save the topic configuration on filer
  20. return fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  21. return t.WriteConfFile(client, conf)
  22. })
  23. }
  24. func (fca *FilerClientAccessor) ReadTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) {
  25. glog.V(1).Infof("load conf for topic %v from filer", t)
  26. if err = fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  27. conf, err = t.ReadConfFile(client)
  28. return err
  29. }); err != nil {
  30. return nil, err
  31. }
  32. return conf, nil
  33. }
  34. // ReadTopicConfFromFilerWithMetadata reads topic configuration along with file creation and modification times
  35. func (fca *FilerClientAccessor) ReadTopicConfFromFilerWithMetadata(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, createdAtNs, modifiedAtNs int64, err error) {
  36. glog.V(1).Infof("load conf with metadata for topic %v from filer", t)
  37. if err = fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  38. conf, createdAtNs, modifiedAtNs, err = t.ReadConfFileWithMetadata(client)
  39. return err
  40. }); err != nil {
  41. return nil, 0, 0, err
  42. }
  43. return conf, createdAtNs, modifiedAtNs, nil
  44. }