volume_grpc_admin.go 11 KB


  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "path/filepath"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/util/version"
  8. "github.com/seaweedfs/seaweedfs/weed/storage"
  9. "github.com/seaweedfs/seaweedfs/weed/cluster"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/stats"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  19. )
  20. func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
  21. resp := &volume_server_pb.DeleteCollectionResponse{}
  22. err := vs.store.DeleteCollection(req.Collection)
  23. if err != nil {
  24. glog.Errorf("delete collection %s: %v", req.Collection, err)
  25. } else {
  26. glog.V(2).Infof("delete collection %v", req)
  27. }
  28. return resp, err
  29. }
  30. func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_pb.AllocateVolumeRequest) (*volume_server_pb.AllocateVolumeResponse, error) {
  31. resp := &volume_server_pb.AllocateVolumeResponse{}
  32. err := vs.store.AddVolume(
  33. needle.VolumeId(req.VolumeId),
  34. req.Collection,
  35. vs.needleMapKind,
  36. req.Replication,
  37. req.Ttl,
  38. req.Preallocate,
  39. needle.Version(req.Version),
  40. req.MemoryMapMaxSizeMb,
  41. types.ToDiskType(req.DiskType),
  42. vs.ldbTimout,
  43. )
  44. if err != nil {
  45. glog.Errorf("assign volume %v: %v", req, err)
  46. } else {
  47. glog.V(2).Infof("assign volume %v", req)
  48. }
  49. return resp, err
  50. }
  51. func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.VolumeMountRequest) (*volume_server_pb.VolumeMountResponse, error) {
  52. resp := &volume_server_pb.VolumeMountResponse{}
  53. err := vs.store.MountVolume(needle.VolumeId(req.VolumeId))
  54. if err != nil {
  55. glog.Errorf("volume mount %v: %v", req, err)
  56. } else {
  57. glog.V(2).Infof("volume mount %v", req)
  58. }
  59. return resp, err
  60. }
  61. func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb.VolumeUnmountRequest) (*volume_server_pb.VolumeUnmountResponse, error) {
  62. resp := &volume_server_pb.VolumeUnmountResponse{}
  63. err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
  64. if err != nil {
  65. glog.Errorf("volume unmount %v: %v", req, err)
  66. } else {
  67. glog.V(2).Infof("volume unmount %v", req)
  68. }
  69. return resp, err
  70. }
  71. func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.VolumeDeleteRequest) (*volume_server_pb.VolumeDeleteResponse, error) {
  72. resp := &volume_server_pb.VolumeDeleteResponse{}
  73. err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId), req.OnlyEmpty)
  74. if err != nil {
  75. glog.Errorf("volume delete %v: %v", req, err)
  76. } else {
  77. glog.V(2).Infof("volume delete %v", req)
  78. }
  79. return resp, err
  80. }
  81. func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_pb.VolumeConfigureRequest) (*volume_server_pb.VolumeConfigureResponse, error) {
  82. resp := &volume_server_pb.VolumeConfigureResponse{}
  83. // check replication format
  84. if _, err := super_block.NewReplicaPlacementFromString(req.Replication); err != nil {
  85. resp.Error = fmt.Sprintf("volume configure replication %v: %v", req, err)
  86. return resp, nil
  87. }
  88. // unmount
  89. if err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)); err != nil {
  90. glog.Errorf("volume configure unmount %v: %v", req, err)
  91. resp.Error = fmt.Sprintf("volume configure unmount %v: %v", req, err)
  92. return resp, nil
  93. }
  94. // modify the volume info file
  95. if err := vs.store.ConfigureVolume(needle.VolumeId(req.VolumeId), req.Replication); err != nil {
  96. glog.Errorf("volume configure %v: %v", req, err)
  97. resp.Error = fmt.Sprintf("volume configure %v: %v", req, err)
  98. return resp, nil
  99. }
  100. // mount
  101. if err := vs.store.MountVolume(needle.VolumeId(req.VolumeId)); err != nil {
  102. glog.Errorf("volume configure mount %v: %v", req, err)
  103. resp.Error = fmt.Sprintf("volume configure mount %v: %v", req, err)
  104. return resp, nil
  105. }
  106. return resp, nil
  107. }
  108. func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
  109. resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
  110. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  111. if v == nil {
  112. return nil, fmt.Errorf("volume %d not found", req.VolumeId)
  113. }
  114. // step 1: stop master from redirecting traffic here
  115. if err := vs.notifyMasterVolumeReadonly(v, true); err != nil {
  116. return resp, err
  117. }
  118. // rare case 1.5: it will be unlucky if heartbeat happened between step 1 and 2.
  119. // step 2: mark local volume as readonly
  120. err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId), req.GetPersist())
  121. if err != nil {
  122. glog.Errorf("volume mark readonly %v: %v", req, err)
  123. } else {
  124. glog.V(2).Infof("volume mark readonly %v", req)
  125. }
  126. // step 3: tell master from redirecting traffic here again, to prevent rare case 1.5
  127. if err := vs.notifyMasterVolumeReadonly(v, true); err != nil {
  128. return resp, err
  129. }
  130. return resp, err
  131. }
  132. func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly bool) error {
  133. if grpcErr := pb.WithMasterClient(false, vs.GetMaster(context.Background()), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
  134. _, err := client.VolumeMarkReadonly(context.Background(), &master_pb.VolumeMarkReadonlyRequest{
  135. Ip: vs.store.Ip,
  136. Port: uint32(vs.store.Port),
  137. VolumeId: uint32(v.Id),
  138. Collection: v.Collection,
  139. ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
  140. Ttl: v.Ttl.ToUint32(),
  141. DiskType: string(v.DiskType()),
  142. IsReadonly: isReadOnly,
  143. })
  144. if err != nil {
  145. return fmt.Errorf("set volume %d to read only on master: %v", v.Id, err)
  146. }
  147. return nil
  148. }); grpcErr != nil {
  149. glog.V(0).Infof("connect to %s: %v", vs.GetMaster(context.Background()), grpcErr)
  150. return fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(context.Background()), grpcErr)
  151. }
  152. return nil
  153. }
  154. func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
  155. resp := &volume_server_pb.VolumeMarkWritableResponse{}
  156. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  157. if v == nil {
  158. return nil, fmt.Errorf("volume %d not found", req.VolumeId)
  159. }
  160. err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId))
  161. if err != nil {
  162. glog.Errorf("volume mark writable %v: %v", req, err)
  163. } else {
  164. glog.V(2).Infof("volume mark writable %v", req)
  165. }
  166. // enable master to redirect traffic here
  167. if err := vs.notifyMasterVolumeReadonly(v, false); err != nil {
  168. return resp, err
  169. }
  170. return resp, err
  171. }
  172. func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.VolumeStatusRequest) (*volume_server_pb.VolumeStatusResponse, error) {
  173. resp := &volume_server_pb.VolumeStatusResponse{}
  174. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  175. if v == nil {
  176. return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
  177. }
  178. if v.DataBackend == nil {
  179. return nil, fmt.Errorf("volume %d data backend not found", req.VolumeId)
  180. }
  181. volumeSize, _, _ := v.DataBackend.GetStat()
  182. resp.IsReadOnly = v.IsReadOnly()
  183. resp.VolumeSize = uint64(volumeSize)
  184. resp.FileCount = v.FileCount()
  185. resp.FileDeletedCount = v.DeletedCount()
  186. return resp, nil
  187. }
  188. func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
  189. resp := &volume_server_pb.VolumeServerStatusResponse{
  190. MemoryStatus: stats.MemStat(),
  191. Version: version.Version(),
  192. DataCenter: vs.dataCenter,
  193. Rack: vs.rack,
  194. }
  195. for _, loc := range vs.store.Locations {
  196. if dir, e := filepath.Abs(loc.Directory); e == nil {
  197. resp.DiskStatuses = append(resp.DiskStatuses, stats.NewDiskStatus(dir))
  198. }
  199. }
  200. return resp, nil
  201. }
  202. func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) {
  203. resp := &volume_server_pb.VolumeServerLeaveResponse{}
  204. vs.StopHeartbeat()
  205. return resp, nil
  206. }
  207. func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) {
  208. resp := &volume_server_pb.VolumeNeedleStatusResponse{}
  209. volumeId := needle.VolumeId(req.VolumeId)
  210. n := &needle.Needle{
  211. Id: types.NeedleId(req.NeedleId),
  212. }
  213. var count int
  214. var err error
  215. hasVolume := vs.store.HasVolume(volumeId)
  216. if !hasVolume {
  217. _, hasEcVolume := vs.store.FindEcVolume(volumeId)
  218. if !hasEcVolume {
  219. return nil, fmt.Errorf("volume not found %d", req.VolumeId)
  220. }
  221. count, err = vs.store.ReadEcShardNeedle(volumeId, n, nil)
  222. } else {
  223. count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil, nil)
  224. }
  225. if err != nil {
  226. return nil, err
  227. }
  228. if count < 0 {
  229. return nil, fmt.Errorf("needle not found %d", n.Id)
  230. }
  231. resp.NeedleId = uint64(n.Id)
  232. resp.Cookie = uint32(n.Cookie)
  233. resp.Size = uint32(n.Size)
  234. resp.LastModified = n.LastModified
  235. resp.Crc = n.Checksum.Value()
  236. if n.HasTtl() {
  237. resp.Ttl = n.Ttl.String()
  238. }
  239. return resp, nil
  240. }
  241. func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequest) (resp *volume_server_pb.PingResponse, pingErr error) {
  242. resp = &volume_server_pb.PingResponse{
  243. StartTimeNs: time.Now().UnixNano(),
  244. }
  245. if req.TargetType == cluster.FilerType {
  246. pingErr = pb.WithFilerClient(false, 0, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  247. pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
  248. if pingResp != nil {
  249. resp.RemoteTimeNs = pingResp.StartTimeNs
  250. }
  251. return err
  252. })
  253. }
  254. if req.TargetType == cluster.VolumeServerType {
  255. pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  256. pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
  257. if pingResp != nil {
  258. resp.RemoteTimeNs = pingResp.StartTimeNs
  259. }
  260. return err
  261. })
  262. }
  263. if req.TargetType == cluster.MasterType {
  264. pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
  265. pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
  266. if pingResp != nil {
  267. resp.RemoteTimeNs = pingResp.StartTimeNs
  268. }
  269. return err
  270. })
  271. }
  272. if pingErr != nil {
  273. pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
  274. }
  275. resp.StopTimeNs = time.Now().UnixNano()
  276. return
  277. }