filer_server.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/stats"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  14. "github.com/seaweedfs/seaweedfs/weed/operation"
  15. "github.com/seaweedfs/seaweedfs/weed/pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. "github.com/seaweedfs/seaweedfs/weed/filer"
  20. _ "github.com/seaweedfs/seaweedfs/weed/filer/arangodb"
  21. _ "github.com/seaweedfs/seaweedfs/weed/filer/cassandra"
  22. _ "github.com/seaweedfs/seaweedfs/weed/filer/cassandra2"
  23. _ "github.com/seaweedfs/seaweedfs/weed/filer/elastic/v7"
  24. _ "github.com/seaweedfs/seaweedfs/weed/filer/etcd"
  25. _ "github.com/seaweedfs/seaweedfs/weed/filer/hbase"
  26. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
  27. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb2"
  28. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb3"
  29. _ "github.com/seaweedfs/seaweedfs/weed/filer/mongodb"
  30. _ "github.com/seaweedfs/seaweedfs/weed/filer/mysql"
  31. _ "github.com/seaweedfs/seaweedfs/weed/filer/mysql2"
  32. _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres"
  33. _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres2"
  34. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis"
  35. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis2"
  36. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis3"
  37. _ "github.com/seaweedfs/seaweedfs/weed/filer/sqlite"
  38. _ "github.com/seaweedfs/seaweedfs/weed/filer/tarantool"
  39. _ "github.com/seaweedfs/seaweedfs/weed/filer/ydb"
  40. "github.com/seaweedfs/seaweedfs/weed/glog"
  41. "github.com/seaweedfs/seaweedfs/weed/notification"
  42. _ "github.com/seaweedfs/seaweedfs/weed/notification/aws_sqs"
  43. _ "github.com/seaweedfs/seaweedfs/weed/notification/gocdk_pub_sub"
  44. _ "github.com/seaweedfs/seaweedfs/weed/notification/google_pub_sub"
  45. _ "github.com/seaweedfs/seaweedfs/weed/notification/kafka"
  46. _ "github.com/seaweedfs/seaweedfs/weed/notification/log"
  47. _ "github.com/seaweedfs/seaweedfs/weed/notification/webhook"
  48. "github.com/seaweedfs/seaweedfs/weed/security"
  49. )
  50. type FilerOption struct {
  51. Masters *pb.ServerDiscovery
  52. FilerGroup string
  53. Collection string
  54. DefaultReplication string
  55. DisableDirListing bool
  56. MaxMB int
  57. DirListingLimit int
  58. DataCenter string
  59. Rack string
  60. DataNode string
  61. DefaultLevelDbDir string
  62. DisableHttp bool
  63. Host pb.ServerAddress
  64. recursiveDelete bool
  65. Cipher bool
  66. SaveToFilerLimit int64
  67. ConcurrentUploadLimit int64
  68. ShowUIDirectoryDelete bool
  69. DownloadMaxBytesPs int64
  70. DiskType string
  71. AllowedOrigins []string
  72. ExposeDirectoryData bool
  73. }
  74. type FilerServer struct {
  75. inFlightDataSize int64
  76. listenersWaits int64
  77. // notifying clients
  78. listenersLock sync.Mutex
  79. listenersCond *sync.Cond
  80. inFlightDataLimitCond *sync.Cond
  81. filer_pb.UnimplementedSeaweedFilerServer
  82. option *FilerOption
  83. secret security.SigningKey
  84. filer *filer.Filer
  85. filerGuard *security.Guard
  86. volumeGuard *security.Guard
  87. grpcDialOption grpc.DialOption
  88. // metrics read from the master
  89. metricsAddress string
  90. metricsIntervalSec int
  91. // track known metadata listeners
  92. knownListenersLock sync.Mutex
  93. knownListeners map[int32]int32
  94. }
  95. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  96. v := util.GetViper()
  97. signingKey := v.GetString("jwt.filer_signing.key")
  98. v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
  99. expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
  100. readSigningKey := v.GetString("jwt.filer_signing.read.key")
  101. v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
  102. readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
  103. volumeSigningKey := v.GetString("jwt.signing.key")
  104. v.SetDefault("jwt.signing.expires_after_seconds", 10)
  105. volumeExpiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
  106. volumeReadSigningKey := v.GetString("jwt.signing.read.key")
  107. v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
  108. volumeReadExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
  109. v.SetDefault("cors.allowed_origins.values", "*")
  110. allowedOrigins := v.GetString("cors.allowed_origins.values")
  111. domains := strings.Split(allowedOrigins, ",")
  112. option.AllowedOrigins = domains
  113. v.SetDefault("filer.expose_directory_metadata.enabled", true)
  114. returnDirMetadata := v.GetBool("filer.expose_directory_metadata.enabled")
  115. option.ExposeDirectoryData = returnDirMetadata
  116. fs = &FilerServer{
  117. option: option,
  118. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  119. knownListeners: make(map[int32]int32),
  120. inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
  121. }
  122. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  123. option.Masters.RefreshBySrvIfAvailable()
  124. if len(option.Masters.GetInstances()) == 0 {
  125. glog.Fatal("master list is required!")
  126. }
  127. if !util.LoadConfiguration("filer", false) {
  128. v.SetDefault("leveldb2.enabled", true)
  129. v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir)
  130. _, err := os.Stat(option.DefaultLevelDbDir)
  131. if os.IsNotExist(err) {
  132. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  133. }
  134. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  135. } else {
  136. glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
  137. }
  138. util.LoadConfiguration("notification", false)
  139. v.SetDefault("filer.options.max_file_name_length", 255)
  140. maxFilenameLength := v.GetUint32("filer.options.max_file_name_length")
  141. glog.V(0).Infof("max_file_name_length %d", maxFilenameLength)
  142. fs.filer = filer.NewFiler(*option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, maxFilenameLength, func() {
  143. if atomic.LoadInt64(&fs.listenersWaits) > 0 {
  144. fs.listenersCond.Broadcast()
  145. }
  146. })
  147. fs.filer.Cipher = option.Cipher
  148. whiteList := util.StringSplit(v.GetString("guard.white_list"), ",")
  149. fs.filerGuard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
  150. fs.volumeGuard = security.NewGuard([]string{}, volumeSigningKey, volumeExpiresAfterSec, volumeReadSigningKey, volumeReadExpiresAfterSec)
  151. fs.checkWithMaster()
  152. go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
  153. go fs.filer.KeepMasterClientConnected(context.Background())
  154. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  155. v.SetDefault("filer.options.buckets_folder", "/buckets")
  156. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  157. // TODO deprecated, will be removed after 2020-12-31
  158. // replaced by https://github.com/seaweedfs/seaweedfs/wiki/Path-Specific-Configuration
  159. // fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  160. isFresh := fs.filer.LoadConfiguration(v)
  161. notification.LoadConfiguration(v, "notification.")
  162. handleStaticResources(defaultMux)
  163. if !option.DisableHttp {
  164. defaultMux.HandleFunc("/healthz", requestIDMiddleware(fs.filerHealthzHandler))
  165. defaultMux.HandleFunc("/", fs.filerGuard.WhiteList(requestIDMiddleware(fs.filerHandler)))
  166. }
  167. if defaultMux != readonlyMux {
  168. handleStaticResources(readonlyMux)
  169. readonlyMux.HandleFunc("/healthz", requestIDMiddleware(fs.filerHealthzHandler))
  170. readonlyMux.HandleFunc("/", fs.filerGuard.WhiteList(requestIDMiddleware(fs.readonlyFilerHandler)))
  171. }
  172. existingNodes := fs.filer.ListExistingPeerUpdates(context.Background())
  173. startFromTime := time.Now().Add(-filer.LogFlushInterval)
  174. if isFresh {
  175. glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes)
  176. if err := fs.filer.MaybeBootstrapFromOnePeer(option.Host, existingNodes, startFromTime); err != nil {
  177. glog.Fatalf("%s bootstrap from %+v: %v", option.Host, existingNodes, err)
  178. }
  179. }
  180. fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime)
  181. fs.filer.LoadFilerConf()
  182. fs.filer.LoadRemoteStorageConfAndMapping()
  183. grace.OnReload(fs.Reload)
  184. grace.OnInterrupt(func() {
  185. fs.filer.Shutdown()
  186. })
  187. fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot)
  188. return fs, nil
  189. }
  190. func (fs *FilerServer) checkWithMaster() {
  191. isConnected := false
  192. for !isConnected {
  193. fs.option.Masters.RefreshBySrvIfAvailable()
  194. for _, master := range fs.option.Masters.GetInstances() {
  195. readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  196. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  197. if err != nil {
  198. return fmt.Errorf("get master %s configuration: %v", master, err)
  199. }
  200. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  201. return nil
  202. })
  203. if readErr == nil {
  204. isConnected = true
  205. } else {
  206. time.Sleep(7 * time.Second)
  207. }
  208. }
  209. }
  210. }
  211. func (fs *FilerServer) Reload() {
  212. glog.V(0).Infoln("Reload filer server...")
  213. util.LoadConfiguration("security", false)
  214. v := util.GetViper()
  215. fs.filerGuard.UpdateWhiteList(util.StringSplit(v.GetString("guard.white_list"), ","))
  216. }