volume_server.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package weed_server
  2. import (
  3. "net/http"
  4. "sync"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  9. "google.golang.org/grpc"
  10. "github.com/seaweedfs/seaweedfs/weed/stats"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/security"
  14. "github.com/seaweedfs/seaweedfs/weed/storage"
  15. )
  16. type VolumeServer struct {
  17. volume_server_pb.UnimplementedVolumeServerServer
  18. inFlightUploadDataSize int64
  19. inFlightDownloadDataSize int64
  20. concurrentUploadLimit int64
  21. concurrentDownloadLimit int64
  22. inFlightUploadDataLimitCond *sync.Cond
  23. inFlightDownloadDataLimitCond *sync.Cond
  24. inflightUploadDataTimeout time.Duration
  25. inflightDownloadDataTimeout time.Duration
  26. hasSlowRead bool
  27. readBufferSizeMB int
  28. SeedMasterNodes []pb.ServerAddress
  29. whiteList []string
  30. currentMaster pb.ServerAddress
  31. pulseSeconds int
  32. dataCenter string
  33. rack string
  34. store *storage.Store
  35. guard *security.Guard
  36. grpcDialOption grpc.DialOption
  37. needleMapKind storage.NeedleMapKind
  38. ldbTimout int64
  39. FixJpgOrientation bool
  40. ReadMode string
  41. compactionBytePerSecond int64
  42. metricsAddress string
  43. metricsIntervalSec int
  44. fileSizeLimitBytes int64
  45. isHeartbeating bool
  46. stopChan chan bool
  47. }
  48. func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
  49. port int, grpcPort int, publicUrl string,
  50. folders []string, maxCounts []int32, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType,
  51. idxFolder string,
  52. needleMapKind storage.NeedleMapKind,
  53. masterNodes []pb.ServerAddress, pulseSeconds int,
  54. dataCenter string, rack string,
  55. whiteList []string,
  56. fixJpgOrientation bool,
  57. readMode string,
  58. compactionMBPerSecond int,
  59. fileSizeLimitMB int,
  60. concurrentUploadLimit int64,
  61. concurrentDownloadLimit int64,
  62. inflightUploadDataTimeout time.Duration,
  63. inflightDownloadDataTimeout time.Duration,
  64. hasSlowRead bool,
  65. readBufferSizeMB int,
  66. ldbTimeout int64,
  67. ) *VolumeServer {
  68. v := util.GetViper()
  69. signingKey := v.GetString("jwt.signing.key")
  70. v.SetDefault("jwt.signing.expires_after_seconds", 10)
  71. expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
  72. enableUiAccess := v.GetBool("access.ui")
  73. readSigningKey := v.GetString("jwt.signing.read.key")
  74. v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
  75. readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
  76. vs := &VolumeServer{
  77. pulseSeconds: pulseSeconds,
  78. dataCenter: dataCenter,
  79. rack: rack,
  80. needleMapKind: needleMapKind,
  81. FixJpgOrientation: fixJpgOrientation,
  82. ReadMode: readMode,
  83. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
  84. compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
  85. fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
  86. isHeartbeating: true,
  87. stopChan: make(chan bool),
  88. inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)),
  89. inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
  90. concurrentUploadLimit: concurrentUploadLimit,
  91. concurrentDownloadLimit: concurrentDownloadLimit,
  92. inflightUploadDataTimeout: inflightUploadDataTimeout,
  93. inflightDownloadDataTimeout: inflightDownloadDataTimeout,
  94. hasSlowRead: hasSlowRead,
  95. readBufferSizeMB: readBufferSizeMB,
  96. ldbTimout: ldbTimeout,
  97. whiteList: whiteList,
  98. }
  99. whiteList = append(whiteList, util.StringSplit(v.GetString("guard.white_list"), ",")...)
  100. vs.SeedMasterNodes = masterNodes
  101. vs.checkWithMaster()
  102. vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes, ldbTimeout)
  103. vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
  104. handleStaticResources(adminMux)
  105. adminMux.HandleFunc("/status", requestIDMiddleware(vs.statusHandler))
  106. adminMux.HandleFunc("/healthz", requestIDMiddleware(vs.healthzHandler))
  107. if signingKey == "" || enableUiAccess {
  108. // only expose the volume server details for safe environments
  109. adminMux.HandleFunc("/ui/index.html", requestIDMiddleware(vs.uiStatusHandler))
  110. /*
  111. adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
  112. adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
  113. adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
  114. */
  115. }
  116. adminMux.HandleFunc("/", requestIDMiddleware(vs.privateStoreHandler))
  117. if publicMux != adminMux {
  118. // separated admin and public port
  119. handleStaticResources(publicMux)
  120. publicMux.HandleFunc("/", requestIDMiddleware(vs.publicReadOnlyHandler))
  121. }
  122. stats.VolumeServerConcurrentDownloadLimit.Set(float64(vs.concurrentDownloadLimit))
  123. stats.VolumeServerConcurrentUploadLimit.Set(float64(vs.concurrentUploadLimit))
  124. go vs.heartbeat()
  125. go stats.LoopPushingMetric("volumeServer", util.JoinHostPort(ip, port), vs.metricsAddress, vs.metricsIntervalSec)
  126. return vs
  127. }
  128. func (vs *VolumeServer) SetStopping() {
  129. glog.V(0).Infoln("Stopping volume server...")
  130. vs.store.SetStopping()
  131. }
  132. func (vs *VolumeServer) LoadNewVolumes() {
  133. glog.V(0).Infoln(" Loading new volume ids ...")
  134. vs.store.LoadNewVolumes()
  135. }
  136. func (vs *VolumeServer) Shutdown() {
  137. glog.V(0).Infoln("Shutting down volume server...")
  138. vs.store.Close()
  139. glog.V(0).Infoln("Shut down successfully!")
  140. }
  141. func (vs *VolumeServer) Reload() {
  142. glog.V(0).Infoln("Reload volume server...")
  143. util.LoadConfiguration("security", false)
  144. v := util.GetViper()
  145. vs.guard.UpdateWhiteList(append(vs.whiteList, util.StringSplit(v.GetString("guard.white_list"), ",")...))
  146. }